C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1450850940
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##########
@@ -44,6 +49,12 @@ public ErrantRecordSinkTask() {
public void start(Map<String, String> props) {
super.start(props);
reporter = context.errantRecordReporter();
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void stop() {
+ ThreadUtils.shutdownExecutorServiceQuietly(executorService, 4,
TimeUnit.SECONDS);
Review Comment:
Just curious--any rationale for four seconds specifically?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##########
@@ -75,9 +77,9 @@ public class ErrorHandlingIntegrationTest {
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
private static final String TASK_ID = "error-conn-0";
- private static final int NUM_RECORDS_PRODUCED = 20;
- private static final int EXPECTED_CORRECT_RECORDS = 19;
+ private static final int NUM_RECORDS_PRODUCED = 1000;
private static final int EXPECTED_INCORRECT_RECORDS = 1;
+ private static final int EXPECTED_CORRECT_RECORDS = NUM_RECORDS_PRODUCED -
EXPECTED_INCORRECT_RECORDS;
Review Comment:
Again, I know this isn't your fault, but I'm a little confused at the use of
this field in the `testErrantRecordReporter` case. I believe it can be replaced
with `NUM_RECORDS_PRODUCED`
[here](https://github.com/gharris1727/kafka/blob/f5845038014f3df29d505eeadd01403e9756728f/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java#L220),
since this case doesn't cover any errors that should prevent records from
reaching tasks.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrantRecordSinkConnector.java:
##########
@@ -54,7 +65,16 @@ public void put(Collection<SinkRecord> records) {
.computeIfAbsent(rec.topic(), v -> new HashMap<>())
.computeIfAbsent(rec.kafkaPartition(), v -> new
TopicPartition(rec.topic(), rec.kafkaPartition()));
committedOffsets.put(tp, committedOffsets.getOrDefault(tp, 0)
+ 1);
- reporter.report(rec, new Throwable());
+ Throwable error = new Throwable();
+ // Test synchronous and asynchronous reporting, allowing for
re-ordering the errant reports
Review Comment:
We don't have any corresponding verification that this behavior is handled
correctly. If there's an easy, lightweight way to add that, it'd be nice, but
it's not worth blocking on if it's too cumbersome.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java:
##########
@@ -245,7 +249,7 @@ public void testErrantRecordReporter() throws Exception {
// consume failed records from dead letter queue topic
log.info("Consuming records from test topic");
- ConsumerRecords<byte[], byte[]> messages =
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS,
DLQ_TOPIC);
+ ConsumerRecords<byte[], byte[]> messages =
connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS,
DLQ_TOPIC);
Review Comment:
Nit (not your fault): we don't use the `messages` field at all.
```suggestion
connect.kafka().consume(NUM_RECORDS_PRODUCED,
CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
```
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -392,90 +397,29 @@ public void testSetConfigs() {
}
@Test
- public void testThreadSafety() throws Throwable {
- long runtimeMs = 5_000;
- int numThreads = 10;
- // Check that multiple threads using RetryWithToleranceOperator
concurrently
- // can't corrupt the state of the ProcessingContext
- AtomicReference<Throwable> failed = new AtomicReference<>(null);
- RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM,
errorHandlingMetrics, new ProcessingContext() {
- private final AtomicInteger count = new AtomicInteger();
- private final AtomicInteger attempt = new AtomicInteger();
-
- @Override
- public void error(Throwable error) {
- if (count.getAndIncrement() > 0) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error()"));
- }
- super.error(error);
- }
-
- @Override
- public Future<Void> report() {
- if (count.getAndSet(0) > 1) {
- failed.compareAndSet(null, new
AssertionError("Concurrent call to error() in report()"));
- }
-
- return super.report();
- }
-
- @Override
- public void currentContext(Stage stage, Class<?> klass) {
- this.attempt.set(0);
- super.currentContext(stage, klass);
- }
-
- @Override
- public void attempt(int attempt) {
- if (!this.attempt.compareAndSet(attempt - 1, attempt))
{
- failed.compareAndSet(null, new AssertionError(
- "Concurrent call to attempt(): Attempts
should increase monotonically " +
- "within the scope of a given
currentContext()"));
- }
- super.attempt(attempt);
- }
- }, new CountDownLatch(1));
-
- ExecutorService pool = Executors.newFixedThreadPool(numThreads);
- List<? extends Future<?>> futures = IntStream.range(0,
numThreads).boxed()
- .map(id ->
- pool.submit(() -> {
- long t0 = System.currentTimeMillis();
- long i = 0;
- while (true) {
- if (++i % 10000 == 0 &&
System.currentTimeMillis() > t0 + runtimeMs) {
- break;
- }
- if (failed.get() != null) {
- break;
- }
- try {
- if (id < numThreads / 2) {
-
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
- SinkTask.class,
consumerRecord, new Throwable()).get();
- } else {
- retryWithToleranceOperator.execute(()
-> null, Stage.TRANSFORMATION,
- SinkTask.class);
- }
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
- }
- }))
- .collect(Collectors.toList());
- pool.shutdown();
- pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
- futures.forEach(future -> {
- try {
- future.get();
- } catch (Exception e) {
- failed.compareAndSet(null, e);
- }
Review Comment:
Hmmm... it's a little worrisome to see this test (which, as we now know, was
insufficient) completely removed without a replacement. I guess it'd be
difficult to test again and we can sort of hand-wave that the structural
changes in this PR increase thread safety, but if possible, it'd be great to
see at least some new testing coverage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]