[
https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850530#comment-17850530
]
Greg Harris commented on KAFKA-16858:
-------------------------------------
Thanks so much for the additional context [~undone] this is a very odd bug.
I managed to reproduce a Flatten NPE with mutable input objects rather than
mocking:
{noformat}
xformValue.configure(Collections.emptyMap());
Schema innerSchema = SchemaBuilder.struct().optional();
Struct innerStruct = new Struct(innerSchema);
AtomicReference<Schema> valueSchema = new AtomicReference<>(innerSchema);
Schema arraySchema = SchemaBuilder.array(new ConnectSchema(Schema.Type.ARRAY) {
@Override
public Schema valueSchema() {
return valueSchema.get();
}
});
Schema schema = SchemaBuilder.struct().field("field", arraySchema);
Struct value = new Struct(schema).put("field",
Collections.singletonList(Collections.singletonList(innerStruct)));
valueSchema.set(null);
SourceRecord record = new SourceRecord(null, null, "topic", 0, schema, value);
xformValue.apply(record);{noformat}
It throws this error:
{noformat}
Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because "schema" is
null
java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.connect.data.Schema.name()" because "schema" is null
at
org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at
org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:250)
at
org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:164)
at
org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat}
This is different from your stacktrace in two ways:
# Instead of 4 buildWithSchema calls, there's only 1. This is because my test
Struct is nested less deeply than your Struct, and a deeper test case behaves
almost identically.
# Instead of 3 validateValue calls, there's 5. This is because I'm using an a
nested array "[[Struct]]" instead of your singly-nested array "[Struct]". This
one is a bit more important, because the reproduction case doesn't work for
singly-nested arrays. The difference is that a singly-nested array has it's
valueSchema evaluated during `buildUpdatedSchema`, and the doubly-nested array
has it's valueSchema evaluated during `buildWithSchema`. When the null
valueSchema is evaluated during buildUpdatedSchema, it throws this exception
instead:
{noformat}
valueSchema cannot be null.
org.apache.kafka.connect.errors.SchemaBuilderException: valueSchema cannot be
null.
at
app//org.apache.kafka.connect.data.SchemaBuilder.array(SchemaBuilder.java:363)
at
app//org.apache.kafka.connect.transforms.util.SchemaUtil.copySchemaBasics(SchemaUtil.java:29)
at
app//org.apache.kafka.connect.transforms.Flatten.convertFieldSchema(Flatten.java:225)
at
app//org.apache.kafka.connect.transforms.Flatten.buildUpdatedSchema(Flatten.java:201)
at
app//org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:156)
at
app//org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:79){noformat}
I was unable to reproduce this with non-mutable schemas, they trigger the NPE
too early while the input Struct is being constructed.
Some follow-ups:
* Are you able to provide an anonymized form of your schema directly, rather
than just a high-level "Array of Structs"? I'm wondering if your schema is
capable of triggering the use of the mutable SchemaWrapper
[https://github.com/confluentinc/schema-registry/blob/7b886f309c83041d4f2a5b41b5910f3b8002413a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1779]
inside the ProtobufConverter.
* I don't have an explanation of how this can happen for empty and non-present
arrays, as it looks like validateValue(ConnectSchema:255) can only be triggered
by non-empty lists.
* w.r.t. the variable validateValue depth: Are you saying that in _error
cases_ the recursion depth is unpredictable, or in general? The validateValue
should be called at every or almost every location in the tree of values, so I
would expect to see lots of different recursion depths. Maybe you can share
some more stacktraces as examples.
* So far in this investigation, I'm trying to find the source of the null in
hopes that we can prevent it, and get well-formed data to the Flatten SMT.
Regardless of the result of that investigation, I think we can consider this
input malformed, and throw an intentional DataException instead of
NullPointerException. Would that be an acceptable solution for you, or does
this data need to make it all the way through the pipeline?
> Flatten SMT throws NPE
> ----------------------
>
> Key: KAFKA-16858
> URL: https://issues.apache.org/jira/browse/KAFKA-16858
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 3.6.0
> Environment: Kafka 3.6 by way of CP 7.6.0
> Reporter: Adam Strickland
> Priority: Major
> Attachments: FlattenTest.java
>
>
> {{ConnectSchema.expectedClassesFor}} sometimes will throw an NPE as part of a
> call to an SMT chain. Stack trace snippet:
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.apply(MomentFlatten.java:84)}}
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.applyWithSchema(MomentFlatten.java:173)}}
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:274)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:203)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:216)}}
> {{at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)}}
> {{at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)}}
> {{at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)}}
> {{at
> org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)}}
> (the above transform is a sub-class of
> {{{}o.a.k.connect.transforms.Flatten{}}}; have confirmed that the error
> occurs regardless).
> The field being transformed is an array of structs. If the call to
> {{Schema#valueSchema()}} (o.a.k.connect.data.ConnectSchema.java:255) returns
> {{{}null{}}}, the subsequent call to {{Schema#name()}} at
> o.a.k.connect.data.ConnectSchema:268 throws an NPE.
> The strange thing that we have observed is that this doesn't always happen;
> *sometimes* the struct's schema is found and sometimes it is not. We have
> been unable to determine the root cause, but have constructed a test that
> replicates the problem as observed (see attachment).
> In our case we have worked around the issue with the aforementioned sub-class
> of {{{}Flatten{}}}, catching and logging the NPE on that specific use-case.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)