This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new c78dfc5235 ensures no writes happen after batch writer closes (#3733) c78dfc5235 is described below commit c78dfc5235f74c03d149971f2fb63e55ed27c6c1 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Aug 31 07:08:14 2023 -0400 ensures no writes happen after batch writer closes (#3733) Fixes a problem with the batch writer where when retries happened that writes could possibly happen after the batch writer was closed. Adds a test that causes writers after close without the fixes in this PR. This fixes #3721 --------- Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../core/clientImpl/TabletServerBatchWriter.java | 260 +++++++++++++++------ .../accumulo/tserver/TabletClientHandler.java | 97 ++++---- .../apache/accumulo/test/WriteAfterCloseIT.java | 203 ++++++++++++++++ 3 files changed, 445 insertions(+), 115 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index c5740c02f8..de66339886 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.clientImpl; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -35,15 +37,19 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.OptionalLong; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -65,14 +71,15 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.dataImpl.thrift.TMutation; import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors; +import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TApplicationException; @@ -160,7 +167,7 @@ public class TabletServerBatchWriter implements AutoCloseable { private final HashSet<String> serverSideErrors = new HashSet<>(); private final FailedMutations failedMutations; private int unknownErrors = 0; - private boolean somethingFailed = false; + private final AtomicBoolean somethingFailed = new AtomicBoolean(false); private Exception lastUnknownError = null; private static class TimeoutTracker { @@ -263,7 +270,7 @@ public class TabletServerBatchWriter implements AutoCloseable { checkForFailures(); - waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed); + waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed.get()); // do checks again since things could have changed while waiting and not holding lock if (closed) { @@ -323,7 +330,7 @@ public class TabletServerBatchWriter implements AutoCloseable { if (flushing) { // some other thread is currently flushing, so wait - waitRTE(() -> flushing && !somethingFailed); + waitRTE(() -> flushing && !somethingFailed.get()); checkForFailures(); @@ -335,7 +342,7 @@ public class TabletServerBatchWriter implements AutoCloseable { startProcessing(); checkForFailures(); - waitRTE(() -> totalMemUsed > 0 && !somethingFailed); + waitRTE(() -> totalMemUsed > 0 && !somethingFailed.get()); flushing = false; this.notifyAll(); @@ -362,7 +369,7 @@ public class TabletServerBatchWriter implements AutoCloseable { startProcessing(); - waitRTE(() -> totalMemUsed > 0 && !somethingFailed); + waitRTE(() -> totalMemUsed > 0 && !somethingFailed.get()); logStats(); @@ -508,7 +515,7 @@ public class TabletServerBatchWriter implements AutoCloseable { private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) { if (!cvsList.isEmpty()) { synchronized (this) { - somethingFailed = true; + somethingFailed.set(true); violations.add(cvsList); this.notifyAll(); } @@ -524,7 +531,7 @@ public class TabletServerBatchWriter implements AutoCloseable { .forEach(context::requireNotDeleted); synchronized (this) { - somethingFailed = true; + somethingFailed.set(true); // add these authorizationFailures to those collected by this batch writer authorizationFailures.forEach((ke, code) -> this.authorizationFailures .computeIfAbsent(ke, k -> new HashSet<>()).add(code)); @@ -534,14 +541,14 @@ public class TabletServerBatchWriter implements AutoCloseable { } private synchronized void updateServerErrors(String server, Exception e) { - somethingFailed = true; + somethingFailed.set(true); this.serverSideErrors.add(server); this.notifyAll(); log.error("Server side error on {}", server, e); } private synchronized void updateUnknownErrors(String msg, Exception t) { - somethingFailed = true; + somethingFailed.set(true); unknownErrors++; this.lastUnknownError = t; this.notifyAll(); @@ -554,7 +561,7 @@ public class TabletServerBatchWriter implements AutoCloseable { } private void checkForFailures() throws MutationsRejectedException { - if (somethingFailed) { + if (somethingFailed.get()) { List<ConstraintViolationSummary> cvsList = violations.asList(); HashMap<TabletId,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<>(); @@ -871,7 +878,15 @@ public class TabletServerBatchWriter implements AutoCloseable { } long st1 = System.currentTimeMillis(); - failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker); + try (SessionCloser sessionCloser = new SessionCloser(location)) { + failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker, + sessionCloser); + } catch (ThriftSecurityException e) { + updateAuthorizationFailures( + mutationBatch.keySet().stream().collect(toMap(identity(), ke -> e.code))); + throw new AccumuloSecurityException(e.user, e.code, e); + } + long st2 = System.currentTimeMillis(); if (log.isTraceEnabled()) { log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in " @@ -901,9 +916,7 @@ public class TabletServerBatchWriter implements AutoCloseable { span.end(); } } catch (IOException e) { - if (log.isTraceEnabled()) { - log.trace("failed to send mutations to {} : {}", location, e.getMessage()); - } + log.debug("failed to send mutations to {} : {}", location, e.getMessage()); HashSet<TableId> tables = new HashSet<>(); for (KeyExtent ke : mutationBatch.keySet()) { @@ -922,7 +935,8 @@ public class TabletServerBatchWriter implements AutoCloseable { } private MutationSet sendMutationsToTabletServer(String location, - Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) + Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker, + SessionCloser sessionCloser) throws IOException, AccumuloSecurityException, AccumuloServerException { if (tabMuts.isEmpty()) { return new MutationSet(); @@ -931,6 +945,8 @@ public class TabletServerBatchWriter implements AutoCloseable { timeoutTracker.startingWrite(); + // If there is an open session, must close it before the batchwriter closes or writes could + // happen after the batch writer closes. See #3721 try { final HostAndPort parsedServer = HostAndPort.fromString(location); final TabletClientService.Iface client; @@ -945,81 +961,71 @@ public class TabletServerBatchWriter implements AutoCloseable { try { MutationSet allFailures = new MutationSet(); - if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) { - Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next(); + // set the session on the sessionCloser so that any failures after this point will close + // the session if needed + sessionCloser.setSession( + client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability))); + + List<TMutation> updates = new ArrayList<>(); + for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { + long size = 0; + Iterator<Mutation> iter = entry.getValue().iterator(); + while (iter.hasNext()) { + while (size < MUTATION_BATCH_SIZE && iter.hasNext()) { + Mutation mutation = iter.next(); + updates.add(mutation.toThrift()); + size += mutation.numBytes(); + } - try { - client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), - entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability)); - } catch (NotServingTabletException e) { - allFailures.addAll(entry.getKey().tableId(), entry.getValue()); - getLocator(entry.getKey().tableId()).invalidateCache(entry.getKey()); - } catch (ConstraintViolationException e) { - updatedConstraintViolations(e.violationSummaries.stream() - .map(ConstraintViolationSummary::new).collect(toList())); + client.applyUpdates(tinfo, sessionCloser.getSession(), entry.getKey().toThrift(), + updates); + updates.clear(); + size = 0; } - timeoutTracker.madeProgress(); - } else { + } - long usid = - client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability)); - - List<TMutation> updates = new ArrayList<>(); - for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { - long size = 0; - Iterator<Mutation> iter = entry.getValue().iterator(); - while (iter.hasNext()) { - while (size < MUTATION_BATCH_SIZE && iter.hasNext()) { - Mutation mutation = iter.next(); - updates.add(mutation.toThrift()); - size += mutation.numBytes(); - } + UpdateErrors updateErrors = client.closeUpdate(tinfo, sessionCloser.getSession()); - client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates); - updates.clear(); - size = 0; - } - } + // the write completed successfully so no need to close the session + sessionCloser.clearSession(); - UpdateErrors updateErrors = client.closeUpdate(tinfo, usid); - - // @formatter:off + // @formatter:off Map<KeyExtent,Long> failures = updateErrors.failedExtents.entrySet().stream().collect(toMap( entry -> KeyExtent.fromThrift(entry.getKey()), Entry::getValue )); // @formatter:on - updatedConstraintViolations(updateErrors.violationSummaries.stream() - .map(ConstraintViolationSummary::new).collect(toList())); - // @formatter:off + updatedConstraintViolations(updateErrors.violationSummaries.stream() + .map(ConstraintViolationSummary::new).collect(toList())); + // @formatter:off updateAuthorizationFailures(updateErrors.authorizationFailures.entrySet().stream().collect(toMap( entry -> KeyExtent.fromThrift(entry.getKey()), Entry::getValue ))); // @formatter:on - long totalCommitted = 0; + long totalCommitted = 0; - for (Entry<KeyExtent,Long> entry : failures.entrySet()) { - KeyExtent failedExtent = entry.getKey(); - int numCommitted = (int) (long) entry.getValue(); - totalCommitted += numCommitted; + for (Entry<KeyExtent,Long> entry : failures.entrySet()) { + KeyExtent failedExtent = entry.getKey(); + int numCommitted = (int) (long) entry.getValue(); + totalCommitted += numCommitted; - TableId tableId = failedExtent.tableId(); + TableId tableId = failedExtent.tableId(); - getLocator(tableId).invalidateCache(failedExtent); + getLocator(tableId).invalidateCache(failedExtent); - List<Mutation> mutations = tabMuts.get(failedExtent); - allFailures.addAll(tableId, mutations.subList(numCommitted, mutations.size())); - } + List<Mutation> mutations = tabMuts.get(failedExtent); + allFailures.addAll(tableId, mutations.subList(numCommitted, mutations.size())); + } - if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) { - // nothing was successfully written - timeoutTracker.wroteNothing(); - } else { - // successfully wrote something to tablet server - timeoutTracker.madeProgress(); - } + if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) { + // nothing was successfully written + timeoutTracker.wroteNothing(); + } else { + // successfully wrote something to tablet server + timeoutTracker.madeProgress(); } + return allFailures; } finally { ThriftUtil.returnClient((TServiceClient) client, context); @@ -1028,9 +1034,13 @@ public class TabletServerBatchWriter implements AutoCloseable { timeoutTracker.errorOccured(); throw new IOException(e); } catch (TApplicationException tae) { + // no need to close the session when unretryable errors happen + sessionCloser.clearSession(); updateServerErrors(location, tae); throw new AccumuloServerException(location, tae); } catch (ThriftSecurityException e) { + // no need to close the session when unretryable errors happen + sessionCloser.clearSession(); updateAuthorizationFailures( tabMuts.keySet().stream().collect(toMap(identity(), ke -> e.code))); throw new AccumuloSecurityException(e.user, e.code, e); @@ -1038,6 +1048,114 @@ public class TabletServerBatchWriter implements AutoCloseable { throw new IOException(e); } } + + class SessionCloser implements AutoCloseable { + + private final String location; + private OptionalLong usid; + + SessionCloser(String location) { + this.location = location; + usid = OptionalLong.empty(); + } + + void setSession(long usid) { + this.usid = OptionalLong.of(usid); + } + + public long getSession() { + return usid.getAsLong(); + } + + void clearSession() { + usid = OptionalLong.empty(); + } + + @Override + public void close() throws ThriftSecurityException { + if (usid.isPresent()) { + try { + closeSession(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } + + /** + * Checks if there is a lock held by a tserver at a specific host and port. + */ + private boolean isALockHeld(String tserver) { + var root = context.getZooKeeperRoot() + Constants.ZTSERVERS; + var zLockPath = ServiceLock.path(root + "/" + tserver); + return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0; + } + + private void closeSession() throws InterruptedException, ThriftSecurityException { + + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + final HostAndPort parsedServer = HostAndPort.fromString(location); + + long startTime = System.nanoTime(); + + // If somethingFailed is true then the batch writer will throw an exception on close or + // flush, so no need to close this session. Only want to close the session for retryable + // exceptions. + while (!somethingFailed.get()) { + + TabletClientService.Client client = null; + + // Check if a lock is held by any tserver at the host and port. It does not need to be the + // exact tserver instance that existed when the session was created because if a new + // tserver instance comes up then the session will not exist there. Trying to get the + // exact tserver instance that created the session would require changes to the RPC that + // creates the session and this is not needed. + if (!isALockHeld(location)) { + retry.logCompletion(log, + "No tserver for failed write session " + location + " " + usid); + break; + } + + try { + if (timeout < context.getClientTimeoutInMillis()) { + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context, + timeout); + } else { + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); + } + + client.closeUpdate(TraceUtil.traceInfo(), usid.getAsLong()); + retry.logCompletion(log, "Closed failed write session " + location + " " + usid); + break; + } catch (NoSuchScanIDException e) { + retry.logCompletion(log, + "Failed write session no longer exists " + location + " " + usid); + // The session no longer exists, so done + break; + } catch (TApplicationException tae) { + // no need to bother closing session in this case + updateServerErrors(location, tae); + break; + } catch (ThriftSecurityException e) { + throw e; + } catch (TException e) { + retry.waitForNextAttempt(log, "Attempting to close failed write session " + location + + " " + usid + " " + e.getMessage()); + } finally { + ThriftUtil.returnClient(client, context); + } + + // if a timeout is set on the batch writer, then do not retry longer than the timeout + if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > timeout) { + log.debug("Giving up on closing session {} {} and timing out.", location, usid); + throw new TimedOutException(Set.of(location)); + } + } + } + } } // END code for sending mutations to tablet servers using background threads diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 92c06c85e1..7e19515de6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -528,57 +528,66 @@ public class TabletClientHandler implements TabletClientService.Iface { @Override public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException { - final UpdateSession us = (UpdateSession) server.sessionManager.removeSession(updateID); + // Reserve the session and wait for any write that may currently have it reserved. Once reserved + // no write stragglers can start against this session id. + final UpdateSession us = (UpdateSession) server.sessionManager.reserveSession(updateID, true); if (us == null) { throw new NoSuchScanIDException(); } - // clients may or may not see data from an update session while - // it is in progress, however when the update session is closed - // want to ensure that reads wait for the write to finish - long opid = writeTracker.startWrite(us.queuedMutations.keySet()); - try { - flush(us); - } catch (HoldTimeoutException e) { - // Assumption is that the client has timed out and is gone. If that's not the case throw an - // exception that will cause it to retry. - log.debug("HoldTimeoutException during closeUpdate, reporting no such session"); - throw new NoSuchScanIDException(); + // clients may or may not see data from an update session while + // it is in progress, however when the update session is closed + // want to ensure that reads wait for the write to finish + long opid = writeTracker.startWrite(us.queuedMutations.keySet()); + + try { + flush(us); + } catch (HoldTimeoutException e) { + // Assumption is that the client has timed out and is gone. If that's not the case throw an + // exception that will cause it to retry. + log.debug("HoldTimeoutException during closeUpdate, reporting no such session"); + throw new NoSuchScanIDException(); + } finally { + writeTracker.finishWrite(opid); + } + + if (log.isTraceEnabled()) { + log.trace( + String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", + TServerUtils.clientAddress.get(), us.totalUpdates, + (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes, + us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0, + us.commitTimes.sum() / 1000.0)); + } + if (!us.failures.isEmpty()) { + Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next(); + log.debug(String.format("Failures: %d, first extent %s successful commits: %d", + us.failures.size(), first.getKey().toString(), first.getValue())); + } + List<ConstraintViolationSummary> violations = us.violations.asList(); + if (!violations.isEmpty()) { + ConstraintViolationSummary first = us.violations.asList().iterator().next(); + log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), + first.violationDescription, first.numberOfViolatingMutations)); + } + if (!us.authFailures.isEmpty()) { + KeyExtent first = us.authFailures.keySet().iterator().next(); + log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), + first.toString())); + } + return new UpdateErrors( + us.failures.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toThrift(), Entry::getValue)), + violations.stream().map(ConstraintViolationSummary::toThrift) + .collect(Collectors.toList()), + us.authFailures.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toThrift(), Entry::getValue))); } finally { - writeTracker.finishWrite(opid); + // Atomically unreserve and delete the session. If there any write stragglers, they will fail + // after this point. + server.sessionManager.removeSession(updateID, true); } - - if (log.isTraceEnabled()) { - log.trace( - String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", - TServerUtils.clientAddress.get(), us.totalUpdates, - (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes, - us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0, - us.commitTimes.sum() / 1000.0)); - } - if (!us.failures.isEmpty()) { - Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next(); - log.debug(String.format("Failures: %d, first extent %s successful commits: %d", - us.failures.size(), first.getKey().toString(), first.getValue())); - } - List<ConstraintViolationSummary> violations = us.violations.asList(); - if (!violations.isEmpty()) { - ConstraintViolationSummary first = us.violations.asList().iterator().next(); - log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), - first.violationDescription, first.numberOfViolatingMutations)); - } - if (!us.authFailures.isEmpty()) { - KeyExtent first = us.authFailures.keySet().iterator().next(); - log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), - first.toString())); - } - return new UpdateErrors( - us.failures.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().toThrift(), Entry::getValue)), - violations.stream().map(ConstraintViolationSummary::toThrift).collect(Collectors.toList()), - us.authFailures.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().toThrift(), Entry::getValue))); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java new file mode 100644 index 0000000000..7c5324c80b --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.security.SecureRandom; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.constraints.Constraint; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.jupiter.api.Test; + +public class WriteAfterCloseIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s"); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected Duration defaultTimeout() { + return Duration.ofSeconds(300); + } + + public static class SleepyConstraint implements Constraint { + + private static final SecureRandom rand = new SecureRandom(); + + @Override + public String getViolationDescription(short violationCode) { + return "No such violation"; + } + + @Override + public List<Short> check(Environment env, Mutation mutation) { + + if (mutation.getUpdates().stream().anyMatch(ColumnUpdate::isDeleted)) { + // only want to randomly sleep for inserts, not deletes + return null; + } + + // the purpose of this constraint is to just randomly hold up inserts on the server side + if (rand.nextBoolean()) { + UtilWaitThread.sleep(4000); + } + + return null; + } + } + + @Test + public void testWriteAfterCloseMillisTime() throws Exception { + runTest(TimeType.MILLIS, false, 0, false); + } + + @Test + public void testWriteAfterCloseLogicalTime() throws Exception { + runTest(TimeType.LOGICAL, false, 0, false); + } + + @Test + public void testWriteAfterCloseKillTservers() throws Exception { + runTest(TimeType.MILLIS, true, 0, false); + } + + @Test + public void testWriteAfterCloseTimeout() throws Exception { + // ensure that trying to close seesions does not interfere with timeout + runTest(TimeType.MILLIS, false, 2000, true); + } + + private void runTest(TimeType timeType, boolean killTservers, long timeout, boolean expectErrors) + throws Exception { + // re #3721 test that tries to cause a write event to happen after a batch writer is closed + String table = getUniqueNames(1)[0]; + var props = new Properties(); + props.putAll(getClientProps()); + props.setProperty(Property.GENERAL_RPC_TIMEOUT.getKey(), "1s"); + + NewTableConfiguration ntc = new NewTableConfiguration().setTimeType(timeType); + ntc.setProperties( + Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", SleepyConstraint.class.getName())); + + // The short rpc timeout and the random sleep in the constraint can cause some of the writes + // done by a batch writer to timeout. The batch writer will internally retry the write, but the + // timed out write could still go through at a later time. + + var executor = Executors.newCachedThreadPool(); + + try (AccumuloClient c = Accumulo.newClient().from(props).build()) { + c.tableOperations().create(table, ntc); + + List<Future<?>> futures = new ArrayList<>(); + + for (int i = 0; i < 100; i++) { + futures.add(executor.submit(createWriteTask(i * 1000, c, table, timeout))); + } + + if (killTservers) { + Thread.sleep(250); + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + // sleep longer than ZK timeout to let ephemeral lock nodes expire in ZK + Thread.sleep(11000); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + + int errorCount = 0; + + // wait for all futures to complete + for (var future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + errorCount++; + } + } + + if (expectErrors) { + assertTrue(errorCount > 0); + } else { + assertEquals(0, errorCount); + + try (Scanner scanner = c.createScanner(table)) { + // every insertion was deleted so table should be empty unless there were out of order + // writes + assertEquals(0, scanner.stream().count()); + } + } + } finally { + executor.shutdownNow(); + } + } + + private static Callable<Void> createWriteTask(int row, AccumuloClient c, String table, + long timeout) { + return () -> { + + BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, TimeUnit.MILLISECONDS); + + try (BatchWriter writer = c.createBatchWriter(table, bwc)) { + Mutation m = new Mutation("r" + row); + m.put("f1", "q1", new Value("v1")); + writer.addMutation(m); + } + + // Relying on the internal retries of the batch writer, trying to create a situation where + // some of the writes from above actually happen after the delete below which would negate the + // delete. + + try (BatchWriter writer = c.createBatchWriter(table)) { + Mutation m = new Mutation("r" + row); + m.putDelete("f1", "q1"); + writer.addMutation(m); + } + return null; + }; + } +}