This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b0e2fd0  Wrap and unwrap tasks in thread pools properly (#2442)
b0e2fd0 is described below

commit b0e2fd0d9e2f7c9877edea47650e8f0b6683dd36
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Mon Jan 31 07:55:35 2022 -0500

    Wrap and unwrap tasks in thread pools properly (#2442)
    
    Wrap Runnable and Callable objects properly inside thread pools in a way
    that allows us to unwrap them to make comparisons in priority blocking
    queues, and to remove them from blocking queues when we're directly
    manipulating those thread pool queues.
    
    This fixes #2362
---
 .../org/apache/accumulo/core/trace/TraceUtil.java  | 17 +++++
 .../accumulo/core/trace/TraceWrappedCallable.java  | 73 ++++++++++++++++++++++
 .../accumulo/core/trace/TraceWrappedRunnable.java  | 72 +++++++++++++++++++++
 .../accumulo/core/util/threads/ThreadPools.java    | 33 +++++-----
 .../apache/accumulo/core/util/threads/Threads.java |  4 +-
 .../org/apache/accumulo/server/AbstractServer.java |  8 +--
 .../tserver/TabletServerResourceManager.java       |  6 +-
 .../compactions/InternalCompactionExecutor.java    |  5 +-
 8 files changed, 189 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java
index 4dc0890..ff91f04 100644
--- a/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceUtil.java
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Proxy;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -190,6 +191,22 @@ public class TraceUtil {
         });
   }
 
+  public static Runnable wrap(Runnable r) {
+    return r instanceof TraceWrappedRunnable ? r : new TraceWrappedRunnable(r);
+  }
+
+  public static Runnable unwrap(Runnable r) {
+    return TraceWrappedRunnable.unwrapFully(r);
+  }
+
+  public static <T> Callable<T> wrap(Callable<T> c) {
+    return c instanceof TraceWrappedCallable ? c : new 
TraceWrappedCallable<>(c);
+  }
+
+  public static <T> Callable<T> unwrap(Callable<T> c) {
+    return TraceWrappedCallable.unwrapFully(c);
+  }
+
   public static <T> T wrapService(final T instance) {
     InvocationHandler handler = (obj, method, args) -> {
       if (args == null || args.length < 1 || args[0] == null || !(args[0] 
instanceof TInfo)) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java
new file mode 100644
index 0000000..56fc846
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedCallable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * A class to wrap {@link Callable}s for {@link ThreadPools} in a way that 
still provides access to
+ * the wrapped {@link Callable} instance. This supersedes the use of {@link 
Context#wrap(Callable)}.
+ */
+class TraceWrappedCallable<V> implements Callable<V> {
+
+  private final Context context;
+  private final Callable<V> unwrapped;
+
+  static <C> Callable<C> unwrapFully(Callable<C> c) {
+    while (c instanceof TraceWrappedCallable) {
+      c = ((TraceWrappedCallable<C>) c).unwrapped;
+    }
+    return c;
+  }
+
+  TraceWrappedCallable(Callable<V> other) {
+    this.context = Context.current();
+    this.unwrapped = unwrapFully(other);
+  }
+
+  @Override
+  public V call() throws Exception {
+    try (Scope unused = context.makeCurrent()) {
+      return unwrapped.call();
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof TraceWrappedCallable) {
+      return Objects.equals(unwrapped, ((TraceWrappedCallable<?>) 
obj).unwrapped);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return unwrapped.hashCode();
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java
new file mode 100644
index 0000000..fb2ff31
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/trace/TraceWrappedRunnable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+
+/**
+ * A class to wrap {@link Runnable}s for {@link ThreadPools} in a way that 
still provides access to
+ * the wrapped {@link Runnable} instance. This supersedes the use of {@link 
Context#wrap(Runnable)}.
+ */
+class TraceWrappedRunnable implements Runnable {
+
+  private final Context context;
+  private final Runnable unwrapped;
+
+  static Runnable unwrapFully(Runnable r) {
+    while (r instanceof TraceWrappedRunnable) {
+      r = ((TraceWrappedRunnable) r).unwrapped;
+    }
+    return r;
+  }
+
+  TraceWrappedRunnable(Runnable other) {
+    this.context = Context.current();
+    this.unwrapped = unwrapFully(other);
+  }
+
+  @Override
+  public void run() {
+    try (Scope unused = context.makeCurrent()) {
+      unwrapped.run();
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof TraceWrappedRunnable) {
+      return Objects.equals(unwrapped, ((TraceWrappedRunnable) obj).unwrapped);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return unwrapped.hashCode();
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index ac4ed3d..642ec7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -33,11 +33,10 @@ import java.util.function.IntSupplier;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.trace.TraceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.opentelemetry.context.Context;
-
 public class ThreadPools {
 
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
@@ -326,27 +325,27 @@ public class ThreadPools {
 
       @Override
       public void execute(Runnable arg0) {
-        super.execute(Context.current().wrap(arg0));
+        super.execute(TraceUtil.wrap(arg0));
       }
 
       @Override
       public boolean remove(Runnable task) {
-        return super.remove(Context.current().wrap(task));
+        return super.remove(TraceUtil.wrap(task));
       }
 
       @Override
       public <T> Future<T> submit(Callable<T> task) {
-        return super.submit(Context.current().wrap(task));
+        return super.submit(TraceUtil.wrap(task));
       }
 
       @Override
       public <T> Future<T> submit(Runnable task, T result) {
-        return super.submit(Context.current().wrap(task), result);
+        return super.submit(TraceUtil.wrap(task), result);
       }
 
       @Override
       public Future<?> submit(Runnable task) {
-        return super.submit(Context.current().wrap(task));
+        return super.submit(TraceUtil.wrap(task));
       }
     };
     if (timeOut > 0) {
@@ -390,51 +389,49 @@ public class ThreadPools {
 
       @Override
       public void execute(Runnable command) {
-        super.execute(Context.current().wrap(command));
+        super.execute(TraceUtil.wrap(command));
       }
 
       @Override
       public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
-        return super.schedule(Context.current().wrap(callable), delay, unit);
+        return super.schedule(TraceUtil.wrap(callable), delay, unit);
       }
 
       @Override
       public ScheduledFuture<?> schedule(Runnable command, long delay, 
TimeUnit unit) {
-        return super.schedule(Context.current().wrap(command), delay, unit);
+        return super.schedule(TraceUtil.wrap(command), delay, unit);
       }
 
       @Override
       public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay,
           long period, TimeUnit unit) {
-        return super.scheduleAtFixedRate(Context.current().wrap(command), 
initialDelay, period,
-            unit);
+        return super.scheduleAtFixedRate(TraceUtil.wrap(command), 
initialDelay, period, unit);
       }
 
       @Override
       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay,
           long delay, TimeUnit unit) {
-        return super.scheduleWithFixedDelay(Context.current().wrap(command), 
initialDelay, delay,
-            unit);
+        return super.scheduleWithFixedDelay(TraceUtil.wrap(command), 
initialDelay, delay, unit);
       }
 
       @Override
       public <T> Future<T> submit(Callable<T> task) {
-        return super.submit(Context.current().wrap(task));
+        return super.submit(TraceUtil.wrap(task));
       }
 
       @Override
       public <T> Future<T> submit(Runnable task, T result) {
-        return super.submit(Context.current().wrap(task), result);
+        return super.submit(TraceUtil.wrap(task), result);
       }
 
       @Override
       public Future<?> submit(Runnable task) {
-        return super.submit(Context.current().wrap(task));
+        return super.submit(TraceUtil.wrap(task));
       }
 
       @Override
       public boolean remove(Runnable task) {
-        return super.remove(Context.current().wrap(task));
+        return super.remove(TraceUtil.wrap(task));
       }
 
     };
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
index 2a05a4c..3478ffa 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
@@ -21,7 +21,7 @@ package org.apache.accumulo.core.util.threads;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.OptionalInt;
 
-import io.opentelemetry.context.Context;
+import org.apache.accumulo.core.trace.TraceUtil;
 
 public class Threads {
 
@@ -36,7 +36,7 @@ public class Threads {
   }
 
   public static Thread createThread(String name, OptionalInt priority, 
Runnable r) {
-    Thread thread = new Thread(Context.current().wrap(r), name);
+    Thread thread = new Thread(TraceUtil.wrap(r), name);
     priority.ifPresent(thread::setPriority);
     thread.setDaemon(true);
     thread.setUncaughtExceptionHandler(UEH);
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java 
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 36dcf3a..bad0804 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -30,8 +30,6 @@ import org.apache.accumulo.server.security.SecurityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.opentelemetry.context.Context;
-
 public abstract class AbstractServer implements AutoCloseable, Runnable {
 
   private final ServerContext context;
@@ -63,10 +61,8 @@ public abstract class AbstractServer implements 
AutoCloseable, Runnable {
    */
   public void runServer() throws Exception {
     final AtomicReference<Throwable> err = new AtomicReference<>();
-    Thread service = new Thread(Context.current().wrap(this), applicationName);
-    service.setUncaughtExceptionHandler((thread, exception) -> {
-      err.set(exception);
-    });
+    Thread service = new Thread(TraceUtil.wrap(this), applicationName);
+    service.setUncaughtExceptionHandler((thread, exception) -> 
err.set(exception));
     service.start();
     service.join();
     Throwable thrown = err.get();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 31432e9..79d153f 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -65,6 +65,7 @@ import org.apache.accumulo.core.spi.scan.ScanExecutor;
 import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
 import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.ServerContext;
@@ -175,8 +176,9 @@ public class TabletServerResourceManager {
               }
             });
 
-        // function to extract scan scan session from runnable
-        Function<Runnable,ScanInfo> extractor = r -> 
((ScanSession.ScanMeasurer) r).getScanInfo();
+        // function to extract scan session from runnable
+        Function<Runnable,ScanInfo> extractor =
+            r -> ((ScanSession.ScanMeasurer) 
TraceUtil.unwrap(r)).getScanInfo();
 
         queue = new PriorityBlockingQueue<>(sec.maxThreads,
             Comparator.comparing(extractor, comparator));
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index b964b2a..56b0814 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -125,6 +126,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
         // priority. This runs periodically, instead of every time something 
is canceled, to avoid
         // hurting performance.
         queue.removeIf(runnable -> {
+          runnable = TraceUtil.unwrap(runnable);
           InternalJob internalJob;
           if (runnable instanceof InternalJob) {
             internalJob = (InternalJob) runnable;
@@ -145,6 +147,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
   }
 
   private static CompactionJob getJob(Runnable r) {
+    r = TraceUtil.unwrap(r);
     if (r instanceof InternalJob) {
       return ((InternalJob) r).getJob();
     }
@@ -157,7 +160,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
     var comparator = Comparator.comparing(InternalCompactionExecutor::getJob,
         CompactionJobPrioritizer.JOB_COMPARATOR);
 
-    queue = new PriorityBlockingQueue<Runnable>(100, comparator);
+    queue = new PriorityBlockingQueue<>(100, comparator);
 
     threadPool = ThreadPools.createThreadPool(threads, threads, 60, 
TimeUnit.SECONDS,
         "compaction." + ceid, queue, false);

Reply via email to