gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449326220
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -80,59 +83,57 @@ public class RetryWithToleranceOperator implements
AutoCloseable {
private final ErrorHandlingMetrics errorHandlingMetrics;
private final CountDownLatch stopRequestedLatch;
private volatile boolean stopping; // indicates whether the operator has
been asked to stop retrying
-
- protected final ProcessingContext context;
+ private List<ErrorReporter> reporters;
public RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics) {
- this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
+ this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time,
errorHandlingMetrics, new CountDownLatch(1));
}
RetryWithToleranceOperator(long errorRetryTimeout, long
errorMaxDelayInMillis,
ToleranceType toleranceType, Time time,
ErrorHandlingMetrics errorHandlingMetrics,
- ProcessingContext context, CountDownLatch
stopRequestedLatch) {
+ CountDownLatch stopRequestedLatch) {
this.errorRetryTimeout = errorRetryTimeout;
this.errorMaxDelayInMillis = errorMaxDelayInMillis;
this.errorToleranceType = toleranceType;
this.time = time;
this.errorHandlingMetrics = errorHandlingMetrics;
- this.context = context;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
+ this.reporters = Collections.emptyList();
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- ConsumerRecord<byte[], byte[]>
consumerRecord,
- Throwable error) {
-
+ public Future<Void> executeFailed(ProcessingContext<?> context, Stage
stage, Class<?> executingClass, Throwable error) {
markAsFailed();
- context.consumerRecord(consumerRecord);
context.currentContext(stage, executingClass);
context.error(error);
errorHandlingMetrics.recordFailure();
- Future<Void> errantRecordFuture = context.report();
+ Future<Void> errantRecordFuture = report(context);
if (!withinToleranceLimits()) {
errorHandlingMetrics.recordError();
throw new ConnectException("Tolerance exceeded in error handler",
error);
}
return errantRecordFuture;
}
- public synchronized Future<Void> executeFailed(Stage stage, Class<?>
executingClass,
- SourceRecord sourceRecord,
- Throwable error) {
Review Comment:
Yep, there were two nearly-identical implementations that differed only by
the type of record they accepted.
They differed in the ConnectException message, and when merging them I just
kept the more generic message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]