This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new b0e2fd0 Wrap and unwrap tasks in thread pools properly (#2442) b0e2fd0 is described below commit b0e2fd0d9e2f7c9877edea47650e8f0b6683dd36 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Mon Jan 31 07:55:35 2022 -0500 Wrap and unwrap tasks in thread pools properly (#2442) Wrap Runnable and Callable objects properly inside thread pools in a way that allows us to unwrap them to make comparisons in priority blocking queues, and to remove them from blocking queues when we're directly manipulating those thread pool queues. This fixes #2362 --- .../org/apache/accumulo/core/trace/TraceUtil.java | 17 +++++ .../accumulo/core/trace/TraceWrappedCallable.java | 73 ++++++++++++++++++++++ .../accumulo/core/trace/TraceWrappedRunnable.java | 72 +++++++++++++++++++++ .../accumulo/core/util/threads/ThreadPools.java | 33 +++++----- .../apache/accumulo/core/util/threads/Threads.java | 4 +- .../org/apache/accumulo/server/AbstractServer.java | 8 +-- .../tserver/TabletServerResourceManager.java | 6 +- .../compactions/InternalCompactionExecutor.java | 5 +- 8 files changed, 189 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java index 4dc0890..ff91f04 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Proxy; import java.util.Map; +import java.util.concurrent.Callable; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -190,6 +191,22 @@ public class TraceUtil { }); } + public static Runnable wrap(Runnable r) { + return r instanceof TraceWrappedRunnable ? r : new TraceWrappedRunnable(r); + } + + public static Runnable unwrap(Runnable r) { + return TraceWrappedRunnable.unwrapFully(r); + } + + public static <T> Callable<T> wrap(Callable<T> c) { + return c instanceof TraceWrappedCallable ? c : new TraceWrappedCallable<>(c); + } + + public static <T> Callable<T> unwrap(Callable<T> c) { + return TraceWrappedCallable.unwrapFully(c); + } + public static <T> T wrapService(final T instance) { InvocationHandler handler = (obj, method, args) -> { if (args == null || args.length < 1 || args[0] == null || !(args[0] instanceof TInfo)) { diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java new file mode 100644 index 0000000..56fc846 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import java.util.Objects; +import java.util.concurrent.Callable; + +import org.apache.accumulo.core.util.threads.ThreadPools; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * A class to wrap {@link Callable}s for {@link ThreadPools} in a way that still provides access to + * the wrapped {@link Callable} instance. This supersedes the use of {@link Context#wrap(Callable)}. + */ +class TraceWrappedCallable<V> implements Callable<V> { + + private final Context context; + private final Callable<V> unwrapped; + + static <C> Callable<C> unwrapFully(Callable<C> c) { + while (c instanceof TraceWrappedCallable) { + c = ((TraceWrappedCallable<C>) c).unwrapped; + } + return c; + } + + TraceWrappedCallable(Callable<V> other) { + this.context = Context.current(); + this.unwrapped = unwrapFully(other); + } + + @Override + public V call() throws Exception { + try (Scope unused = context.makeCurrent()) { + return unwrapped.call(); + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof TraceWrappedCallable) { + return Objects.equals(unwrapped, ((TraceWrappedCallable<?>) obj).unwrapped); + } + return false; + } + + @Override + public int hashCode() { + return unwrapped.hashCode(); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java new file mode 100644 index 0000000..fb2ff31 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.trace; + +import java.util.Objects; + +import org.apache.accumulo.core.util.threads.ThreadPools; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +/** + * A class to wrap {@link Runnable}s for {@link ThreadPools} in a way that still provides access to + * the wrapped {@link Runnable} instance. This supersedes the use of {@link Context#wrap(Runnable)}. + */ +class TraceWrappedRunnable implements Runnable { + + private final Context context; + private final Runnable unwrapped; + + static Runnable unwrapFully(Runnable r) { + while (r instanceof TraceWrappedRunnable) { + r = ((TraceWrappedRunnable) r).unwrapped; + } + return r; + } + + TraceWrappedRunnable(Runnable other) { + this.context = Context.current(); + this.unwrapped = unwrapFully(other); + } + + @Override + public void run() { + try (Scope unused = context.makeCurrent()) { + unwrapped.run(); + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof TraceWrappedRunnable) { + return Objects.equals(unwrapped, ((TraceWrappedRunnable) obj).unwrapped); + } + return false; + } + + @Override + public int hashCode() { + return unwrapped.hashCode(); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index ac4ed3d..642ec7f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -33,11 +33,10 @@ import java.util.function.IntSupplier; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.trace.TraceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.opentelemetry.context.Context; - public class ThreadPools { private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class); @@ -326,27 +325,27 @@ public class ThreadPools { @Override public void execute(Runnable arg0) { - super.execute(Context.current().wrap(arg0)); + super.execute(TraceUtil.wrap(arg0)); } @Override public boolean remove(Runnable task) { - return super.remove(Context.current().wrap(task)); + return super.remove(TraceUtil.wrap(task)); } @Override public <T> Future<T> submit(Callable<T> task) { - return super.submit(Context.current().wrap(task)); + return super.submit(TraceUtil.wrap(task)); } @Override public <T> Future<T> submit(Runnable task, T result) { - return super.submit(Context.current().wrap(task), result); + return super.submit(TraceUtil.wrap(task), result); } @Override public Future<?> submit(Runnable task) { - return super.submit(Context.current().wrap(task)); + return super.submit(TraceUtil.wrap(task)); } }; if (timeOut > 0) { @@ -390,51 +389,49 @@ public class ThreadPools { @Override public void execute(Runnable command) { - super.execute(Context.current().wrap(command)); + super.execute(TraceUtil.wrap(command)); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { - return super.schedule(Context.current().wrap(callable), delay, unit); + return super.schedule(TraceUtil.wrap(callable), delay, unit); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { - return super.schedule(Context.current().wrap(command), delay, unit); + return super.schedule(TraceUtil.wrap(command), delay, unit); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return super.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, - unit); + return super.scheduleAtFixedRate(TraceUtil.wrap(command), initialDelay, period, unit); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, - unit); + return super.scheduleWithFixedDelay(TraceUtil.wrap(command), initialDelay, delay, unit); } @Override public <T> Future<T> submit(Callable<T> task) { - return super.submit(Context.current().wrap(task)); + return super.submit(TraceUtil.wrap(task)); } @Override public <T> Future<T> submit(Runnable task, T result) { - return super.submit(Context.current().wrap(task), result); + return super.submit(TraceUtil.wrap(task), result); } @Override public Future<?> submit(Runnable task) { - return super.submit(Context.current().wrap(task)); + return super.submit(TraceUtil.wrap(task)); } @Override public boolean remove(Runnable task) { - return super.remove(Context.current().wrap(task)); + return super.remove(TraceUtil.wrap(task)); } }; diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java index 2a05a4c..3478ffa 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java @@ -21,7 +21,7 @@ package org.apache.accumulo.core.util.threads; import java.lang.Thread.UncaughtExceptionHandler; import java.util.OptionalInt; -import io.opentelemetry.context.Context; +import org.apache.accumulo.core.trace.TraceUtil; public class Threads { @@ -36,7 +36,7 @@ public class Threads { } public static Thread createThread(String name, OptionalInt priority, Runnable r) { - Thread thread = new Thread(Context.current().wrap(r), name); + Thread thread = new Thread(TraceUtil.wrap(r), name); priority.ifPresent(thread::setPriority); thread.setDaemon(true); thread.setUncaughtExceptionHandler(UEH); diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 36dcf3a..bad0804 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -30,8 +30,6 @@ import org.apache.accumulo.server.security.SecurityUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.opentelemetry.context.Context; - public abstract class AbstractServer implements AutoCloseable, Runnable { private final ServerContext context; @@ -63,10 +61,8 @@ public abstract class AbstractServer implements AutoCloseable, Runnable { */ public void runServer() throws Exception { final AtomicReference<Throwable> err = new AtomicReference<>(); - Thread service = new Thread(Context.current().wrap(this), applicationName); - service.setUncaughtExceptionHandler((thread, exception) -> { - err.set(exception); - }); + Thread service = new Thread(TraceUtil.wrap(this), applicationName); + service.setUncaughtExceptionHandler((thread, exception) -> err.set(exception)); service.start(); service.join(); Throwable thrown = err.get(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 31432e9..79d153f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -65,6 +65,7 @@ import org.apache.accumulo.core.spi.scan.ScanExecutor; import org.apache.accumulo.core.spi.scan.ScanInfo; import org.apache.accumulo.core.spi.scan.ScanPrioritizer; import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; @@ -175,8 +176,9 @@ public class TabletServerResourceManager { } }); - // function to extract scan scan session from runnable - Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) r).getScanInfo(); + // function to extract scan session from runnable + Function<Runnable,ScanInfo> extractor = + r -> ((ScanSession.ScanMeasurer) TraceUtil.unwrap(r)).getScanInfo(); queue = new PriorityBlockingQueue<>(sec.maxThreads, Comparator.comparing(extractor, comparator)); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index b964b2a..56b0814 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -125,6 +126,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { // priority. This runs periodically, instead of every time something is canceled, to avoid // hurting performance. queue.removeIf(runnable -> { + runnable = TraceUtil.unwrap(runnable); InternalJob internalJob; if (runnable instanceof InternalJob) { internalJob = (InternalJob) runnable; @@ -145,6 +147,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { } private static CompactionJob getJob(Runnable r) { + r = TraceUtil.unwrap(r); if (r instanceof InternalJob) { return ((InternalJob) r).getJob(); } @@ -157,7 +160,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { var comparator = Comparator.comparing(InternalCompactionExecutor::getJob, CompactionJobPrioritizer.JOB_COMPARATOR); - queue = new PriorityBlockingQueue<Runnable>(100, comparator); + queue = new PriorityBlockingQueue<>(100, comparator); threadPool = ThreadPools.createThreadPool(threads, threads, 60, TimeUnit.SECONDS, "compaction." + ceid, queue, false);