This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 36c9740299 Checks for corruption earlier and always report errors (#5227) 36c9740299 is described below commit 36c9740299361874769cb18683c5130153dc488c Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jan 8 11:07:16 2025 -0500 Checks for corruption earlier and always report errors (#5227) Was looking into an issue were a mutation was corrupted on the network. This caused the mutation to be written to the write ahead log and then a failure occurred in the tablet server which left the tablet in an inconsistent state. Modifed the tablet server code to deserialize mutations as early as possible (it used to deserialize after writing to the walog, now it does it before). Wrote an IT to recreate this problem and found another bug. Writing data to Accumulo does the following. 1. Make a startUpdate RPC to create an update session 2. Make one or more applyUpdates RPCs to add data to the session. These RPCs are thrift oneway calls, so nothing is reported back. 3. Call closeUpdate on the session to see what happened with all of the applyUpdates RPCs done in step 2. If an unexpected exception happened in step 2 above then it would not be reported back to the client. These changes fix and test that as part of testing the corrupt mutation. After these changes if there was an error in step 2, then step 3 now throws an exception. --- .../accumulo/tserver/TabletClientHandler.java | 32 ++++- .../accumulo/tserver/session/UpdateSession.java | 1 + .../apache/accumulo/test/CorruptMutationIT.java | 149 +++++++++++++++++++++ 3 files changed, 180 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 087c733e6f..c6b3873902 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -267,8 +267,8 @@ public class TabletClientHandler implements TabletClientService.Iface { if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) { return; } - if (us.currentTablet == null - && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) { + if (us.currentTablet == null && (us.failures.containsKey(keyExtent) + || us.authFailures.containsKey(keyExtent) || us.unhandledException != null)) { // if there were previous failures, then do not accept additional writes return; } @@ -339,6 +339,11 @@ public class TabletClientHandler implements TabletClientService.Iface { List<Mutation> mutations = us.queuedMutations.get(us.currentTablet); for (TMutation tmutation : tmutations) { Mutation mutation = new ServerMutation(tmutation); + // Deserialize the mutation in an attempt to check for data corruption that happened on + // the network. This will avoid writing a corrupt mutation to the write ahead log and + // failing after its written to the write ahead log when it is deserialized to update the + // in memory map. + mutation.getUpdates(); mutations.add(mutation); additionalMutationSize += mutation.numBytes(); } @@ -358,6 +363,15 @@ public class TabletClientHandler implements TabletClientService.Iface { } } } + } catch (RuntimeException e) { + // This method is a thrift oneway method so an exception from it will not make it back to the + // client. Need to record the exception and set the session such that any future updates to + // the session are ignored. + us.unhandledException = e; + us.currentTablet = null; + + // Rethrowing it will cause logging from thrift, so not adding logging here. + throw e; } finally { if (reserved) { server.sessionManager.unreserveSession(us); @@ -541,6 +555,20 @@ public class TabletClientHandler implements TabletClientService.Iface { } try { + if (us.unhandledException != null) { + // Since flush() is not being called, any memory added to the global queued mutations + // counter will not be decremented. So do that here before throwing an exception. + server.updateTotalQueuedMutationSize(-us.queuedMutationSize); + us.queuedMutationSize = 0; + // make this memory available for GC + us.queuedMutations.clear(); + + // Something unexpected happened during this write session, so throw an exception here to + // cause a TApplicationException on the client side. + throw new IllegalStateException( + "Write session " + updateID + " saw an unexpected exception", us.unhandledException); + } + // clients may or may not see data from an update session while // it is in progress, however when the update session is closed // want to ensure that reads wait for the write to finish diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java index ca487294d7..dc32765c28 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java @@ -50,6 +50,7 @@ public class UpdateSession extends Session { public long flushTime = 0; public long queuedMutationSize = 0; public final Durability durability; + public Exception unhandledException = null; public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability) { super(credentials); diff --git a/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java new file mode 100644 index 0000000000..552cd46b33 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TMutation; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TDurability; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TServiceClient; +import org.junit.jupiter.api.Test; + +public class CorruptMutationIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10"); + } + + @Test + public void testCorruptMutation() throws Exception { + + String table = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + c.tableOperations().create(table); + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("1"); + m.put("f1", "q1", new Value("v1")); + writer.addMutation(m); + } + + var ctx = (ClientContext) c; + var tableId = ctx.getTableId(table); + var extent = new KeyExtent(tableId, null, null); + var tabletMetadata = ctx.getAmple().readTablet(extent, TabletMetadata.ColumnType.LOCATION); + var location = tabletMetadata.getLocation(); + assertNotNull(location); + assertEquals(TabletMetadata.LocationType.CURRENT, location.getType()); + + TabletClientService.Iface client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, location.getHostAndPort(), ctx); + // Make the same RPC calls made by the BatchWriter, but pass a corrupt serialized mutation in + // this try block. + try { + TInfo tinfo = TraceUtil.traceInfo(); + + long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), TDurability.DEFAULT); + + // Write two valid mutations to the session. The tserver buffers data it receives via + // applyUpdates and may not write them until closeUpdate RPC is called. Because + // TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values should be written. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("abc", "z1"), createTMutation("def", "z2"))); + + // Simulate data corruption in the serialized mutation + TMutation badMutation = createTMutation("ghi", "z3"); + badMutation.entries = -42; + + // Write some good and bad mutations to the session. The server side will see an error here, + // however since this is a thrift oneway method no exception is expected here. This should + // leave the session in a broken state where it no longer accepts any new data. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("jkl", "z4"), badMutation, createTMutation("mno", "z5"))); + + // Write two more valid mutations to the session, these should be dropped on the server side + // because of the previous error. So should never see these updates. + client.applyUpdates(tinfo, sessionId, extent.toThrift(), + List.of(createTMutation("pqr", "z6"), createTMutation("stu", "z7"))); + + // Since client.applyUpdates experienced an error, should see an error when closing the + // session. + assertThrows(TApplicationException.class, () -> client.closeUpdate(tinfo, sessionId)); + } finally { + ThriftUtil.returnClient((TServiceClient) client, ctx); + } + + // The values that a scan must see + Set<String> expectedValues = Set.of("v1", "v2", "z1", "z2"); + + // The failed mutation should not have left the tablet in a bad state. Do some follow-on + // actions to ensure the tablet is still functional. + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("2"); + m.put("f1", "q1", new Value("v2")); + writer.addMutation(m); + } + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + + c.tableOperations().flush(table, null, null, true); + + try (Scanner scanner = c.createScanner(table)) { + var valuesSeen = + scanner.stream().map(e -> e.getValue().toString()).collect(Collectors.toSet()); + assertEquals(expectedValues, valuesSeen); + } + } + } + + private static TMutation createTMutation(String row, String value) { + Mutation m = new Mutation(row); + m.put("x", "y", value); + return m.toThrift(); + } +}