gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450941019
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
}
@Test
- public void testThreadSafety() throws Throwable {
- long runtimeMs = 5_000;
- int numThreads = 10;
- // Check that multiple threads using RetryWithToleranceOperator
concurrently
- // can't corrupt the state of the ProcessingContext
- AtomicReference<Throwable> failed = new AtomicReference<>(null);
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM,
errorHandlingMetrics, new ProcessingContext() {
- private final AtomicInteger count = new AtomicInteger();
- private final AtomicInteger attempt = new AtomicInteger();
-
- @Override
- public void error(Throwable error) {
- if (count.getAndIncrement() > 0) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error()"));
- }
- super.error(error);
- }
-
- @Override
- public Future<Void> report() {
- if (count.getAndSet(0) > 1) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error() in report()"));
- }
-
- return super.report();
- }
-
- @Override
- public void currentContext(Stage stage, Class<?> klass) {
- this.attempt.set(0);
- super.currentContext(stage, klass);
- }
-
- @Override
- public void attempt(int attempt) {
- if (!this.attempt.compareAndSet(attempt - 1, attempt))
{
- failed.compareAndSet(null, new AssertionError(
- "Concurrent call to attempt(): Attempts
should increase monotonically " +
- "within the scope of a given
currentContext()"));
- }
- super.attempt(attempt);
- }
- }, new CountDownLatch(1));
-
- ExecutorService pool = Executors.newFixedThreadPool(numThreads);
- List<? extends Future<?>> futures = IntStream.range(0,
numThreads).boxed()
- .map(id ->
- pool.submit(() -> {
- long t0 = System.currentTimeMillis();
- long i = 0;
- while (true) {
- if (++i % 10000 == 0 &&
System.currentTimeMillis() > t0 + runtimeMs) {
- break;
- }
- if (failed.get() != null) {
- break;
- }
- try {
- if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
- SinkTask.class,
consumerRecord, new Throwable()).get();
- } else {
- retryWithToleranceOperator.execute(()
-> null, Stage.TRANSFORMATION,
- SinkTask.class);
- }
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
- }
- }))
- .collect(Collectors.toList());
- pool.shutdown();
- pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
- futures.forEach(future -> {
- try {
- future.get();
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
Review Comment:
I think getting a deterministic reproduction case would involve programmatic
breakpoints like what I used in the SynchronizationTest, and would involve
modifying the WorkerSourceTask to be able to intercept between the execute()
and failed() calls.
I think if I were to copy-paste the call-site from
WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then
it does nothing to prove that the bug is absent in the main code. I think that
is the core flaw in the testThreadSafety test: the class is "thread safe" but
the call-site of it wasn't.
Also when i say "sometimes", i mean like 50%. I tweaked the constants in
that test (1000 records, and a batch size of 5) to increase the number of
opportunities for the bug to surface until it easy to find.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
}
@Test
- public void testThreadSafety() throws Throwable {
- long runtimeMs = 5_000;
- int numThreads = 10;
- // Check that multiple threads using RetryWithToleranceOperator
concurrently
- // can't corrupt the state of the ProcessingContext
- AtomicReference<Throwable> failed = new AtomicReference<>(null);
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM,
errorHandlingMetrics, new ProcessingContext() {
- private final AtomicInteger count = new AtomicInteger();
- private final AtomicInteger attempt = new AtomicInteger();
-
- @Override
- public void error(Throwable error) {
- if (count.getAndIncrement() > 0) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error()"));
- }
- super.error(error);
- }
-
- @Override
- public Future<Void> report() {
- if (count.getAndSet(0) > 1) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error() in report()"));
- }
-
- return super.report();
- }
-
- @Override
- public void currentContext(Stage stage, Class<?> klass) {
- this.attempt.set(0);
- super.currentContext(stage, klass);
- }
-
- @Override
- public void attempt(int attempt) {
- if (!this.attempt.compareAndSet(attempt - 1, attempt))
{
- failed.compareAndSet(null, new AssertionError(
- "Concurrent call to attempt(): Attempts
should increase monotonically " +
- "within the scope of a given
currentContext()"));
- }
- super.attempt(attempt);
- }
- }, new CountDownLatch(1));
-
- ExecutorService pool = Executors.newFixedThreadPool(numThreads);
- List<? extends Future<?>> futures = IntStream.range(0,
numThreads).boxed()
- .map(id ->
- pool.submit(() -> {
- long t0 = System.currentTimeMillis();
- long i = 0;
- while (true) {
- if (++i % 10000 == 0 &&
System.currentTimeMillis() > t0 + runtimeMs) {
- break;
- }
- if (failed.get() != null) {
- break;
- }
- try {
- if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
- SinkTask.class,
consumerRecord, new Throwable()).get();
- } else {
- retryWithToleranceOperator.execute(()
-> null, Stage.TRANSFORMATION,
- SinkTask.class);
- }
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
- }
- }))
- .collect(Collectors.toList());
- pool.shutdown();
- pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
- futures.forEach(future -> {
- try {
- future.get();
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
Review Comment:
I think getting a deterministic reproduction case would involve programmatic
breakpoints like what I used in the SynchronizationTest, and would involve
modifying the WorkerSourceTask to be able to intercept between the execute()
and failed() calls.
I think if I were to copy-paste the call-site from
WorkerSinkTask/WorkerErrantRecordReporter into an explicit test for this, then
it does nothing to prove that the bug is absent in the main code. I think that
is the core flaw in the testThreadSafety test: the class is "thread safe" but
the call-site of it wasn't.
Also when i say "sometimes", i mean like 50%. I tweaked the constants in
that test (1000 records, and a batch size of 5) to increase the number of
opportunities for the bug to surface until it was easy to find.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]