C0urante commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449180005
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements
AutoCloseable {
private final ErrorHandlingMetrics errorHandlingMetrics;
private final CountDownLatch stopRequestedLatch;
private volatile boolean stopping; // indicates whether the operator has
been asked to stop retrying
-
- protected final ProcessingContext context;
+ private List<ErrorReporter> reporters;
public RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics) {
- this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+ this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new CountDownLatch(1));
}
RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics,
- ProcessingContext context, CountDownLatch
stopRequestedLatch) {
+ CountDownLatch stopRequestedLatch) {
this.errorRetryTimeout = errorRetryTimeout;
this.errorMaxDelayInMillis = errorMaxDelayInMillis;
this.errorToleranceType = toleranceType;
this.time = time;
this.errorHandlingMetrics = errorHandlingMetrics;
- this.context = context;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
+ this.reporters = Collections.emptyList();
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- ConsumerRecord<byte[], byte[]>
consumerRecord,
- Throwable error) {
-
+ public Future<Void> executeFailed(ProcessingContext<?> context, Stage
stage, Class<?> executingClass, Throwable error) {
markAsFailed();
- context.consumerRecord(consumerRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
- Future<Void> errantRecordFuture = context.report();
+ Future<Void> errantRecordFuture = report(context);
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in error handler",
error);
}
return errantRecordFuture;
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- SourceRecord sourceRecord,
- Throwable error) {
-
- markAsFailed();
- context.sourceRecord(sourceRecord);
- context.currentContext(stage, executingClass);
- context.error(error);
- errorHandlingMetrics.recordFailure();
- Future<Void> errantRecordFuture = context.report();
- if (!withinToleranceLimits()) {
- errorHandlingMetrics.recordError();
- throw new ConnectException("Tolerance exceeded in Source Worker
error handler", error);
+ /**
+ * Report errors. Should be called only if an error was encountered while
executing the operation.
+ *
+ * @return a errant record future that potentially aggregates the producer
futures
Review Comment:
Nit (I know this is just moved as-is from the `ProcessingContext` class but
we might as well fix it up while we're in the neighborhood):
```suggestion
* @return an errant record future that potentially aggregates the
producer futures
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -17,82 +17,36 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
import org.apache.kafka.connect.source.SourceRecord;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
/**
- * Contains all the metadata related to the currently evaluating operation.
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation,
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the
previous thread.
Review Comment:
Two small questions:
1. Is it technically safe to pass this back and forth between two threads,
as long as it's not accessed concurrently by them?
2. I don't see any synchronization to ensure that reads of fields like
`position`, `klass`, etc. are always up-to-date across different threads.
Should they be marked `volatile`?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
builder.append("' with class '");
builder.append(executingClass() == null ? "null" :
executingClass().getName());
builder.append('\'');
- if (includeMessage && sourceRecord() != null) {
+ T original = original();
+ if (includeMessage && original instanceof SourceRecord) {
Review Comment:
Not a blocker, just a thought: if we want to avoid these `instanceof` checks
(and the potential issues that could come with branches like this if an
unexpected type is used), we could tweak the `ProcessingContext` class:
- Make the constructor private
- Add an abstract `recordMessage()` method that can be used in
`toString(boolean includeMessage)` when `includeMessage` is true to replace the
logic inside these `instanceof` checks
- Add static factory constructor methods: `public static
ProcessingContext<ConsumerRecord<byte[], byte[]>>
forConsumerRecord(ConsumerRecord<byte[], byte[]>)` that return private
subclasses which implement the `toString(boolean includeMessage)` method in the
appropriate fashion for their particular types (could also do public subclasses
but that'd clutter up the API a bit and isn't necessary IMO)
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -303,48 +303,14 @@ public String toString() {
* @param reporters the error reporters (should not be null).
*/
public synchronized void reporters(List<ErrorReporter> reporters) {
- this.context.reporters(reporters);
- }
-
- /**
- * Set the source record being processed in the connect pipeline.
- *
- * @param preTransformRecord the source record
- */
- public synchronized void sourceRecord(SourceRecord preTransformRecord) {
- this.context.sourceRecord(preTransformRecord);
- }
-
- /**
- * Set the record consumed from Kafka in a sink connector.
- *
- * @param consumedMessage the record
- */
- public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]>
consumedMessage) {
- this.context.consumerRecord(consumedMessage);
- }
-
- /**
- * @return true, if the last operation encountered an error; false
otherwise
- */
- public synchronized boolean failed() {
- return this.context.failed();
- }
-
- /**
- * Returns the error encountered when processing the current stage.
- *
- * @return the error encountered when processing the current stage
- */
- public synchronized Throwable error() {
- return this.context.error();
+ this.reporters = Objects.requireNonNull(reporters, "reporters");
Review Comment:
Not a blocker, and probably best left for follow-up: we only ever invoke
this method once across the lifetime of this class. We might consider tweaking
this class to:
- Remove this method
- Change the constructor to accept a `Supplier<List<ErrorReporter>>
reporters`
- Wrapping that in a `CachedSupplier` (sketched out below)
```java
public class CachedSupplier<T> implements Supplier<T> {
private final Supplier<T> supplier;
private volatile T cached;
public CachedSupplier(Supplier<T> supplier) {
this.supplier = supplier;
this.cached = null;
}
@Override
public T get() {
if (cached != null) {
return cached;
} else {
// Only lock if we may end up instantiating our cached field
synchronized (this) {
if (cached == null)
cached = supplier.get();
return cached;
}
}
}
}
```
That'd eliminate the need for a lot of the synchronization around the
`reporters` field and could simplify the API for the
`RetryWithToleranceOperator` class. If this seems appealing, let me know and I
can throw something together.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -17,30 +17,18 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
-import
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter.ErrantRecordFuture;
import org.apache.kafka.connect.source.SourceRecord;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
/**
- * Contains all the metadata related to the currently evaluating operation.
Only one instance of this class is meant
- * to exist per task in a JVM.
+ * Contains all the metadata related to the currently evaluating operation,
and associated with a particular
+ * sink or source record from the consumer or task, respectively. This class
is not thread safe, and so once an
+ * instance is passed to a new thread, it should no longer be accessed by the
previous thread.
*/
-class ProcessingContext implements AutoCloseable {
-
- private Collection<ErrorReporter> reporters = Collections.emptyList();
+public class ProcessingContext {
- private ConsumerRecord<byte[], byte[]> consumedMessage;
- private SourceRecord sourceRecord;
+ private final ConsumerRecord<byte[], byte[]> consumedMessage;
+ private final SourceRecord sourceRecord;
/**
* The following fields need to be reset every time a new record is seen.
Review Comment:
Is this comment still relevant?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##########
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter
createAndSetup(Map<String, Object> adminPr
/**
* Write the raw records into a Kafka topic and return the producer future.
*
- * @param context processing context containing the raw record at {@link
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link
ProcessingContext#original()}.
* @return the future associated with the writing of this record; never
null
*/
- public Future<RecordMetadata> report(ProcessingContext context) {
+ @SuppressWarnings("unchecked")
+ public Future<RecordMetadata> report(ProcessingContext<?> context) {
if (dlqTopicName.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
- ConsumerRecord<byte[], byte[]> originalMessage =
context.consumerRecord();
- if (originalMessage == null) {
+ if (!(context.original() instanceof ConsumerRecord)) {
Review Comment:
I wish we could retain full type safety here but if it's too much work to be
worth it then we can live with this. Do you think we can at least log a warning
inside this branch if `context.original()` is non-null, though?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -236,7 +237,7 @@ protected <V> V execAndHandleError(Operation<V> operation,
Class<? extends Excep
}
// Visible for testing
- void markAsFailed() {
+ synchronized void markAsFailed() {
errorHandlingMetrics.recordErrorTimestamp();
totalFailures++;
}
Review Comment:
(Commenting here because GitHub won't allow me to comment any lower)
Is it possible that `withinToleranceLimits` is still not quite safe enough?
Imagine this sequence of events:
1. A `SinkTask` invokes `WorkerErrantRecordReporter::report` from a separate
thread (i.e., not the one on which `SinkTask::put` is invoked), and as a
result, this [invokes
RetryWithToleranceOperator::executeFailed](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L113),
which in turn [records a
failure](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L107)
such that `RetryWithToleranceOperator::withinToleranceLimits` now returns
false.
2. After this, on the main work thread for the sink task,
`RetryWithToleranceOperator::execute` is invoked for [key/value/header
conversion](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L533C39-L539)
or [record
transformation](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java#L55),
and a tolerable exception is thrown while attempting the operation. A [check
on
withinToleranceLimits()](https://github.com/gharris1727/kafka/blob/12a61af6f04025cc20897f544920f3b046effed1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L230-L232)
is made, which return false, and causes an exception to be thrown that claims
that the tolerable exception failed the task, when in reality, it was the
exception reporte
d via `WorkerErrantRecordReporter::report` that caused it to fail instead.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -143,7 +144,7 @@ public synchronized Future<Void> executeFailed(Stage stage,
Class<?> executingCl
* @param <V> return type of the result of the operation.
* @return result of the operation
*/
- public synchronized <V> V execute(Operation<V> operation, Stage stage,
Class<?> executingClass) {
+ public <V> V execute(ProcessingContext<?> context, Operation<V> operation,
Stage stage, Class<?> executingClass) {
Review Comment:
I know this isn't your fault but if you have time, could we add a `throws`
clause to the Javadocs stating that an exception will be thrown if a
non-retriable error is encountered? I always get tripped up reading
interactions with this class and a big part of it is trying to understand the
conditions where exceptions are thrown, the context is marked as failed, or
`null` is returned.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java:
##########
@@ -121,20 +121,22 @@ public static DeadLetterQueueReporter
createAndSetup(Map<String, Object> adminPr
/**
* Write the raw records into a Kafka topic and return the producer future.
*
- * @param context processing context containing the raw record at {@link
ProcessingContext#consumerRecord()}.
+ * @param context processing context containing the raw record at {@link
ProcessingContext#original()}.
* @return the future associated with the writing of this record; never
null
*/
- public Future<RecordMetadata> report(ProcessingContext context) {
+ @SuppressWarnings("unchecked")
+ public Future<RecordMetadata> report(ProcessingContext<?> context) {
if (dlqTopicName.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
errorHandlingMetrics.recordDeadLetterQueueProduceRequest();
- ConsumerRecord<byte[], byte[]> originalMessage =
context.consumerRecord();
- if (originalMessage == null) {
+ if (!(context.original() instanceof ConsumerRecord)) {
errorHandlingMetrics.recordDeadLetterQueueProduceFailed();
return CompletableFuture.completedFuture(null);
}
+ ProcessingContext<ConsumerRecord<byte[], byte[]>> sinkContext =
(ProcessingContext<ConsumerRecord<byte[], byte[]>>) context;
Review Comment:
Just to be safe, should this be `ProcessingContext<? extends
ConsumerRecord<byte[], byte[]>>`?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements
AutoCloseable {
private final ErrorHandlingMetrics errorHandlingMetrics;
private final CountDownLatch stopRequestedLatch;
private volatile boolean stopping; // indicates whether the operator has
been asked to stop retrying
-
- protected final ProcessingContext context;
+ private List<ErrorReporter> reporters;
public RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics) {
- this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+ this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new CountDownLatch(1));
}
RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics,
- ProcessingContext context, CountDownLatch
stopRequestedLatch) {
+ CountDownLatch stopRequestedLatch) {
this.errorRetryTimeout = errorRetryTimeout;
this.errorMaxDelayInMillis = errorMaxDelayInMillis;
this.errorToleranceType = toleranceType;
this.time = time;
this.errorHandlingMetrics = errorHandlingMetrics;
- this.context = context;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
+ this.reporters = Collections.emptyList();
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- ConsumerRecord<byte[], byte[]>
consumerRecord,
- Throwable error) {
-
+ public Future<Void> executeFailed(ProcessingContext<?> context, Stage
stage, Class<?> executingClass, Throwable error) {
markAsFailed();
- context.consumerRecord(consumerRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
- Future<Void> errantRecordFuture = context.report();
+ Future<Void> errantRecordFuture = report(context);
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in error handler",
error);
}
return errantRecordFuture;
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- SourceRecord sourceRecord,
- Throwable error) {
Review Comment:
Just to make sure I'm following along: this was removed because it was made
redundant by making `ProcessingContext` generic, but all previous call sites
for either variant of `executeFailed` now call the single remaining variant,
correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]