gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449513344
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ProcessingContext.java:
##########
@@ -168,11 +102,13 @@ public String toString(boolean includeMessage) {
builder.append("' with class '");
builder.append(executingClass() == null ? "null" :
executingClass().getName());
builder.append('\'');
- if (includeMessage && sourceRecord() != null) {
+ T original = original();
+ if (includeMessage && original instanceof SourceRecord) {
Review Comment:
Ill expand on this, since it also provides context for the strange casting
in DeadLetterQueueReporter.
I think the strange "if source do [x], if sink do [y]" in
`ProcessingContext#toString(boolean)` and `DeadLetterQueueReporter#report()`
are both symptoms of the inadequate ErrorReporter signature. Since there's only
one ErrorReporter signature used by both sources and sinks, there isn't
anything in the type definitions to force the DeadLetterQueueReporter to only
work with sinks, so it has to include a runtime check in the implementation.
Similarly, there are two different "kinds" of LogReporter, the one for
sources that hits the first branch in the toString, and one that hits the
second. Each LogReporter instance only ever takes one of the branches, but that
isn't clear from the implementation. If we make ErrorReporter generic, we can
have the implementations give functionality for the specific record types when
compiling or instantiating, instead of in the report() function.
But pulling on this thread unravelled a bit more: To make ErrorReporter
generic, we have to make RetryWithToleranceOperator generic, make LogReporter
generic, change some RetryWithToleranceOperator instance reuse, and move the
RetryWithToleranceOperator out of WorkerTask. All of which are just lateral
refactors, and I think would be easier to address in a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]