C0urante commented on code in PR #15598: URL: https://github.com/apache/kafka/pull/15598#discussion_r1695447336
########## clients/src/test/java/org/apache/kafka/common/utils/MockExit.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Mock fatal procedures for use in testing environments where the application should not terminate the JVM. + * <p>Use factory methods {@link #disallowFatal()} ()}, {@link #forExit(Procedure)}, + * {@link #forExitAndHalt(Procedure, Procedure)}, and {@link #forHalt(Procedure)} + * to construct test-safe alternatives for {@link Exit#system()}. + * <p>Instances of this class throw from {@link #exitOrThrow(int, String)} and {@link #haltOrThrow(int, String)}, + * killing the calling thread, but keeping the JVM alive. Call {@link #close()} at the end of the test to run + * hooks and assertions. + */ +public class MockExit extends Exit implements AutoCloseable { + + @FunctionalInterface + public interface Procedure extends Exit.Procedure { + void execute(int statusCode, String message); + static Procedure noop() { + return (statusCode, message) -> { }; + } + } + + @FunctionalInterface + public interface ShutdownHookAdder extends Exit.ShutdownHookAdder { + void addShutdownHook(String name, Runnable runnable); + + static ShutdownHookAdder noop() { + return (name, runnable) -> { }; + } + } Review Comment: Is the intent with these to eventually replace the superclass's `Procedure` and `ShutdownHookAdder` interfaces? ########## clients/src/main/java/org/apache/kafka/common/utils/Exit.java: ########## @@ -119,4 +134,138 @@ public static void resetHaltProcedure() { public static void resetShutdownHookAdder() { shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; } + + /** + * @return the default system exit behavior. Using this grants the ability to stop the JVM at any time. + */ + public static Exit system() { + return SystemExit.SYSTEM; + } + + /** + * @return an immutable reference to exit behavior that is active at the time this method is evaluated. + * <p>This may grant the ability to stop the JVM at any time if the static exit behavior has not been changed. + * <p>Note: changes to the static exit behavior made after this method will not apply to the returned object. + * This is used as a temporary shim between the mutable-static and immutable-instance forms of the Exit class. + * This is intended to be called as early as possible on the same thread that calls + * {@link #setExitProcedure(Procedure)}, {@link #setHaltProcedure(Procedure)}, + * or {@link #addShutdownHook(String, Runnable)} to avoid race conditions. + */ + public static Exit staticContext() { + Procedure exitProcedure = Exit.exitProcedure; + Procedure haltProcedure = Exit.haltProcedure; + ShutdownHookAdder shutdownHookAdder = Exit.shutdownHookAdder; + if (exitProcedure != DEFAULT_EXIT_PROCEDURE + || haltProcedure != DEFAULT_HALT_PROCEDURE + || shutdownHookAdder != DEFAULT_SHUTDOWN_HOOK_ADDER + ) { + // Static exit is mocked + return new StaticContext(exitProcedure, haltProcedure, shutdownHookAdder); + } else { + // No mocks are present, use system procedures. The singleton is used to enable reference equality checks. + return system(); + } + } + + /** + * @see #exitOrThrow(int, String) + */ + public void exitOrThrow(int statusCode) { + exitOrThrow(statusCode, null); + } + + /** + * Terminate the running Java Virtual Machine, or throw an exception if this is not possible. + * <p>By default, this behaves like {@link Runtime#exit(int)}. + * @param message Human-readable termination message to aid in debugging, maybe null. + * @throws Error If termination has been replaced by mocked behavior + * + * @see Runtime#exit + * @see #haltOrThrow + * @see #addShutdownRunnable + */ + public abstract void exitOrThrow(int statusCode, String message); + + /** + * @see #haltOrThrow(int, String) + */ + public void haltOrThrow(int statusCode) { + haltOrThrow(statusCode, null); + } + + /** + * Terminate the running Java Virtual Machine, or throw an exception if this is not possible. + * <p>By default, this behaves like {@link Runtime#halt(int)}. + * @param message Human-readable termination message to aid in debugging, maybe null + * @throws Error If termination has been replaced by mocked behavior + * + * @see Runtime#halt + * @see #exitOrThrow + * @see #addShutdownRunnable + */ + public abstract void haltOrThrow(int statusCode, String message); + + /** + * <p>By default, this behaves like {@link Runtime#addShutdownHook(Thread)}. + * @param name The name of the thread executing the runnable, maybe null. + * @param runnable The operation that should take place at shutdown, non-null. + * @see Runtime#addShutdownHook + * @see #exitOrThrow + * @see #haltOrThrow + */ + public abstract void addShutdownRunnable(String name, Runnable runnable); + + private static final class SystemExit extends Exit { Review Comment: Nit: it's a little strange to have `MockExit` in its own top-level file, but `SystemExit` as an inner class. I guess one reason for this could be to ensure that `SystemExit` is never accessed directly, but if we use the singleton pattern (ensuring at least that the class is never instantiated by external callers) and make it package-private, would that be sufficient to make sure it isn't abused? I'm also trying to see if we can reduce the size of the `Exit` class file to make it easier for new readers to grok. ########## clients/src/main/java/org/apache/kafka/common/utils/Exit.java: ########## @@ -55,22 +55,37 @@ public interface ShutdownHookAdder { private static volatile Procedure haltProcedure = DEFAULT_HALT_PROCEDURE; private static volatile ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; + /** + * Use of this method is discouraged, and should be replaced with {@link #exitOrThrow(int)} + */ Review Comment: For this method and the other newly-discouraged ones, would deprecation be an option? Or would that be noisier than it's worth? ########## clients/src/test/java/org/apache/kafka/common/utils/MockExit.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Mock fatal procedures for use in testing environments where the application should not terminate the JVM. + * <p>Use factory methods {@link #disallowFatal()} ()}, {@link #forExit(Procedure)}, + * {@link #forExitAndHalt(Procedure, Procedure)}, and {@link #forHalt(Procedure)} + * to construct test-safe alternatives for {@link Exit#system()}. + * <p>Instances of this class throw from {@link #exitOrThrow(int, String)} and {@link #haltOrThrow(int, String)}, + * killing the calling thread, but keeping the JVM alive. Call {@link #close()} at the end of the test to run + * hooks and assertions. + */ +public class MockExit extends Exit implements AutoCloseable { + + @FunctionalInterface + public interface Procedure extends Exit.Procedure { + void execute(int statusCode, String message); + static Procedure noop() { + return (statusCode, message) -> { }; + } + } + + @FunctionalInterface + public interface ShutdownHookAdder extends Exit.ShutdownHookAdder { + void addShutdownHook(String name, Runnable runnable); + + static ShutdownHookAdder noop() { + return (name, runnable) -> { }; + } + } + + private static final long SHUTDOWN_HOOKS_MILLIS = TimeUnit.SECONDS.toMillis(30); + + /** + * Fail the test after any fatal procedure is called. + * This should be the default for any tests expecting happy-path execution. + */ + public static MockExit disallowFatal() { + return new MockExit(Procedure.noop(), Procedure.noop(), ShutdownHookAdder.noop(), true); + } + + /** + * Allow any fatal procedure to be called during correct execution + * @param exitProcedure Procedure to run for {@link #exit(int, String)} + * @param haltProcedure Procedure to run for {@link #halt(int, String)} + */ + public static MockExit forExitAndHalt(Procedure exitProcedure, Procedure haltProcedure) { + return new MockExit(exitProcedure, haltProcedure, ShutdownHookAdder.noop(), false); + } + + /** + * Expect the exit procedure to be called + * @param exitProcedure Procedure to run for {@link #exit(int, String)} + */ + public static MockExit forExit(Procedure exitProcedure) { + return new MockExit(exitProcedure, Procedure.noop(), ShutdownHookAdder.noop(), false); + } + + /** + * Expect the halt procedure to be called + * @param haltProcedure Procedure to run for {@link #halt(int, String)} + */ + public static MockExit forHalt(Procedure haltProcedure) { + return new MockExit(Procedure.noop(), haltProcedure, ShutdownHookAdder.noop(), false); + } + + private final AtomicReference<Throwable> firstFatalCall; + private final Procedure exitProcedure; + private final Procedure haltProcedure; + // Must synchronize on this to access. + private IdentityHashMap<Thread, Thread> shutdownHooks; + private final ShutdownHookAdder shutdownHookAdder; + private final boolean failIfFatal; + + private MockExit( + Procedure exitProcedure, + Procedure haltProcedure, + ShutdownHookAdder shutdownHookAdder, + boolean failIfFatal + ) { + this.exitProcedure = Objects.requireNonNull(exitProcedure, "Exit Procedure may not be null"); + this.haltProcedure = Objects.requireNonNull(haltProcedure, "Halt Procedure may not be null"); + this.shutdownHookAdder = Objects.requireNonNull(shutdownHookAdder, "ShutdownHookAdder may noy be null"); + this.firstFatalCall = new AtomicReference<>(); + this.shutdownHooks = new IdentityHashMap<>(); + this.failIfFatal = failIfFatal; + } + + + @Override + public void exitOrThrow(int statusCode, String message) { + runFatalProcedure(exitProcedure, statusCode, message, "exit"); + } + + @Override + public void haltOrThrow(int statusCode, String message) { + runFatalProcedure(haltProcedure, statusCode, message, "halt"); + } + + private void runFatalProcedure(Procedure instanceProcedure, int statusCode, String message, String name) { + try { + instanceProcedure.execute(statusCode, message); + throw new MockFatalProcedureError(name, statusCode, message); + } catch (Throwable t) { + firstFatalCall.compareAndSet(null, t); + throw t; + } + } + + @Override + public void addShutdownRunnable(String name, Runnable runnable) { + trackShutdownHook(name, runnable); + shutdownHookAdder.addShutdownHook(name, runnable); + } + + /** + * Run assertions about fatal procedure calls, and shutdown hooks previously registered. + */ + @Override + public void close() { + try { + runAsserts(); + } finally { + runHooks(); Review Comment: Could you possible give an overview of what kinds of shutdown hooks we add in our code base and whether we anticipate there might be some that it would not make sense to execute during tests? ########## clients/src/main/java/org/apache/kafka/common/utils/Exit.java: ########## @@ -119,4 +134,138 @@ public static void resetHaltProcedure() { public static void resetShutdownHookAdder() { shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; } + + /** + * @return the default system exit behavior. Using this grants the ability to stop the JVM at any time. + */ + public static Exit system() { + return SystemExit.SYSTEM; + } + + /** + * @return an immutable reference to exit behavior that is active at the time this method is evaluated. + * <p>This may grant the ability to stop the JVM at any time if the static exit behavior has not been changed. + * <p>Note: changes to the static exit behavior made after this method will not apply to the returned object. + * This is used as a temporary shim between the mutable-static and immutable-instance forms of the Exit class. + * This is intended to be called as early as possible on the same thread that calls + * {@link #setExitProcedure(Procedure)}, {@link #setHaltProcedure(Procedure)}, + * or {@link #addShutdownHook(String, Runnable)} to avoid race conditions. + */ + public static Exit staticContext() { + Procedure exitProcedure = Exit.exitProcedure; + Procedure haltProcedure = Exit.haltProcedure; + ShutdownHookAdder shutdownHookAdder = Exit.shutdownHookAdder; + if (exitProcedure != DEFAULT_EXIT_PROCEDURE + || haltProcedure != DEFAULT_HALT_PROCEDURE + || shutdownHookAdder != DEFAULT_SHUTDOWN_HOOK_ADDER + ) { + // Static exit is mocked + return new StaticContext(exitProcedure, haltProcedure, shutdownHookAdder); + } else { + // No mocks are present, use system procedures. The singleton is used to enable reference equality checks. + return system(); + } + } + + /** + * @see #exitOrThrow(int, String) + */ + public void exitOrThrow(int statusCode) { + exitOrThrow(statusCode, null); + } + + /** + * Terminate the running Java Virtual Machine, or throw an exception if this is not possible. + * <p>By default, this behaves like {@link Runtime#exit(int)}. + * @param message Human-readable termination message to aid in debugging, maybe null. + * @throws Error If termination has been replaced by mocked behavior + * + * @see Runtime#exit + * @see #haltOrThrow + * @see #addShutdownRunnable + */ + public abstract void exitOrThrow(int statusCode, String message); + + /** + * @see #haltOrThrow(int, String) + */ + public void haltOrThrow(int statusCode) { + haltOrThrow(statusCode, null); + } + + /** + * Terminate the running Java Virtual Machine, or throw an exception if this is not possible. + * <p>By default, this behaves like {@link Runtime#halt(int)}. + * @param message Human-readable termination message to aid in debugging, maybe null + * @throws Error If termination has been replaced by mocked behavior + * + * @see Runtime#halt + * @see #exitOrThrow + * @see #addShutdownRunnable + */ + public abstract void haltOrThrow(int statusCode, String message); + + /** + * <p>By default, this behaves like {@link Runtime#addShutdownHook(Thread)}. + * @param name The name of the thread executing the runnable, maybe null. + * @param runnable The operation that should take place at shutdown, non-null. + * @see Runtime#addShutdownHook + * @see #exitOrThrow + * @see #haltOrThrow + */ + public abstract void addShutdownRunnable(String name, Runnable runnable); + + private static final class SystemExit extends Exit { + + private static final Exit SYSTEM = new SystemExit(); + + private SystemExit() { + } + + @Override + public void exitOrThrow(int statusCode, String message) { + System.exit(statusCode); + } + + @Override + public void haltOrThrow(int statusCode, String message) { + Runtime.getRuntime().halt(statusCode); + } + + @Override + public void addShutdownRunnable(String name, Runnable runnable) { + if (name != null) + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + else + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } + } Review Comment: These implementations match the `DEFAULT_HALT_PROCEDURE`, `DEFAULT_EXIT_PROCEDURE`, and `DEFAULT_SHUTDOWN_HOOK_ADDER` fields. Could we possibly de-duplicate them somehow? ########## clients/src/test/java/org/apache/kafka/common/utils/MockExit.java: ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Mock fatal procedures for use in testing environments where the application should not terminate the JVM. + * <p>Use factory methods {@link #disallowFatal()} ()}, {@link #forExit(Procedure)}, + * {@link #forExitAndHalt(Procedure, Procedure)}, and {@link #forHalt(Procedure)} + * to construct test-safe alternatives for {@link Exit#system()}. + * <p>Instances of this class throw from {@link #exitOrThrow(int, String)} and {@link #haltOrThrow(int, String)}, + * killing the calling thread, but keeping the JVM alive. Call {@link #close()} at the end of the test to run + * hooks and assertions. + */ +public class MockExit extends Exit implements AutoCloseable { + + @FunctionalInterface + public interface Procedure extends Exit.Procedure { + void execute(int statusCode, String message); + static Procedure noop() { + return (statusCode, message) -> { }; + } + } + + @FunctionalInterface + public interface ShutdownHookAdder extends Exit.ShutdownHookAdder { + void addShutdownHook(String name, Runnable runnable); + + static ShutdownHookAdder noop() { + return (name, runnable) -> { }; + } + } + + private static final long SHUTDOWN_HOOKS_MILLIS = TimeUnit.SECONDS.toMillis(30); + + /** + * Fail the test after any fatal procedure is called. + * This should be the default for any tests expecting happy-path execution. + */ + public static MockExit disallowFatal() { + return new MockExit(Procedure.noop(), Procedure.noop(), ShutdownHookAdder.noop(), true); + } + + /** + * Allow any fatal procedure to be called during correct execution + * @param exitProcedure Procedure to run for {@link #exit(int, String)} + * @param haltProcedure Procedure to run for {@link #halt(int, String)} + */ + public static MockExit forExitAndHalt(Procedure exitProcedure, Procedure haltProcedure) { + return new MockExit(exitProcedure, haltProcedure, ShutdownHookAdder.noop(), false); + } + + /** + * Expect the exit procedure to be called + * @param exitProcedure Procedure to run for {@link #exit(int, String)} + */ + public static MockExit forExit(Procedure exitProcedure) { + return new MockExit(exitProcedure, Procedure.noop(), ShutdownHookAdder.noop(), false); + } + + /** + * Expect the halt procedure to be called + * @param haltProcedure Procedure to run for {@link #halt(int, String)} + */ + public static MockExit forHalt(Procedure haltProcedure) { + return new MockExit(Procedure.noop(), haltProcedure, ShutdownHookAdder.noop(), false); + } + + private final AtomicReference<Throwable> firstFatalCall; + private final Procedure exitProcedure; + private final Procedure haltProcedure; + // Must synchronize on this to access. + private IdentityHashMap<Thread, Thread> shutdownHooks; + private final ShutdownHookAdder shutdownHookAdder; + private final boolean failIfFatal; + + private MockExit( + Procedure exitProcedure, + Procedure haltProcedure, + ShutdownHookAdder shutdownHookAdder, + boolean failIfFatal + ) { + this.exitProcedure = Objects.requireNonNull(exitProcedure, "Exit Procedure may not be null"); + this.haltProcedure = Objects.requireNonNull(haltProcedure, "Halt Procedure may not be null"); + this.shutdownHookAdder = Objects.requireNonNull(shutdownHookAdder, "ShutdownHookAdder may noy be null"); + this.firstFatalCall = new AtomicReference<>(); + this.shutdownHooks = new IdentityHashMap<>(); + this.failIfFatal = failIfFatal; + } + + + @Override + public void exitOrThrow(int statusCode, String message) { + runFatalProcedure(exitProcedure, statusCode, message, "exit"); + } + + @Override + public void haltOrThrow(int statusCode, String message) { + runFatalProcedure(haltProcedure, statusCode, message, "halt"); + } + + private void runFatalProcedure(Procedure instanceProcedure, int statusCode, String message, String name) { + try { + instanceProcedure.execute(statusCode, message); + throw new MockFatalProcedureError(name, statusCode, message); + } catch (Throwable t) { + firstFatalCall.compareAndSet(null, t); + throw t; + } + } + + @Override + public void addShutdownRunnable(String name, Runnable runnable) { + trackShutdownHook(name, runnable); + shutdownHookAdder.addShutdownHook(name, runnable); + } + + /** + * Run assertions about fatal procedure calls, and shutdown hooks previously registered. + */ + @Override + public void close() { + try { + runAsserts(); + } finally { + runHooks(); + } + } + + private synchronized void trackShutdownHook(String name, Runnable runnable) { + if (shutdownHooks == null) { + throw new IllegalStateException("MockExit already closed"); + } + Thread t = (name != null) ? KafkaThread.nonDaemon(name, runnable) : new Thread(runnable); + shutdownHooks.put(t, t); + } + + private synchronized Set<Thread> clearShutdownHooks() { + if (shutdownHooks == null) { + return Collections.emptySet(); + } + Set<Thread> threads = shutdownHooks.keySet(); + shutdownHooks = null; + return threads; + } + + private void runHooks() { + Set<Thread> hooks = clearShutdownHooks(); + for (Thread t : hooks) { + t.start(); + } + Timer timer = Time.SYSTEM.timer(SHUTDOWN_HOOKS_MILLIS); + for (Thread t : hooks) { + try { + t.join(timer.remainingMs()); + } catch (InterruptedException ignored) { Review Comment: Can/should we at least log a warning here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
