lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661795305
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,70 +32,83 @@
public class WrappingNullableUtils {
@SuppressWarnings("unchecked")
- private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final
boolean isKey) {
- Deserializer<T> deserializerToUse = specificDeserializer;
- if (deserializerToUse == null) {
+ private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final ProcessorContext context, final
boolean isKey, final String name) {
+ final Deserializer<T> deserializerToUse;
+
+ if (specificDeserializer == null) {
+ final Deserializer<?> contextKeyDeserializer =
context.keySerde().deserializer();
+ final Deserializer<?> contextValueDeserializer =
context.valueSerde().deserializer();
deserializerToUse = (Deserializer<T>) (isKey ?
contextKeyDeserializer : contextValueDeserializer);
} else {
- initNullableDeserializer(deserializerToUse,
contextKeyDeserializer, contextValueDeserializer);
+ deserializerToUse = specificDeserializer;
}
+ initNullableDeserializer(deserializerToUse, context);
return deserializerToUse;
}
@SuppressWarnings("unchecked")
- private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer, final boolean isKey) {
- Serializer<T> serializerToUse = specificSerializer;
- if (serializerToUse == null) {
+ private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final ProcessorContext context, final boolean isKey, final
String name) {
+ final Serializer<T> serializerToUse;
+ if (specificSerializer == null) {
+ final Serializer<?> contextKeySerializer =
context.keySerde().serializer();
+ final Serializer<?> contextValueSerializer =
context.valueSerde().serializer();
serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer :
contextValueSerializer);
} else {
- initNullableSerializer(serializerToUse, contextKeySerializer,
contextValueSerializer);
+ serializerToUse = specificSerializer;
}
+ initNullableSerializer(serializerToUse, context);
return serializerToUse;
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean
isKey) {
- Serde<T> serdeToUse = specificSerde;
+ private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final SerdeGetter getter, final boolean isKey) {
+ final Serde<T> serdeToUse;
+ if (specificSerde == null) {
+ serdeToUse = (Serde<T>) (isKey ? getter.keySerde() :
getter.valueSerde());
+ } else {
+ serdeToUse = specificSerde;
+ }
if (serdeToUse == null) {
- serdeToUse = (Serde<T>) (isKey ? contextKeySerde :
contextValueSerde);
+ final String serde = isKey ? "key" : "value";
+ throw new ConfigException("Please specify a " + serde + " serde or
set one through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) +
"_SERDE_CLASS_CONFIG");
} else if (serdeToUse instanceof WrappingNullableSerde) {
- ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde,
contextValueSerde);
+ ((WrappingNullableSerde) serdeToUse).setIfUnset(getter);
Review comment:
This is causing issues - even though `contextKeySerde` was `Serde<?>`,
some of the underlying `setIfUnset` methods are complaining when I access the
serde straight from the `getter`.
In `StreamStreamJoinIntegrationTest#testOuterRepartitioned` I get a cast
exception:
`class org.apache.kafka.common.serialization.LongDeserializer cannot be cast
to class org.apache.kafka.common.serialization.Serializer
(org.apache.kafka.common.serialization.LongDeserializer and
org.apache.kafka.common.serialization.Serializer are in unnamed module of
loader 'app')
`
It seems like because we aren't passing in the serde directly the generics
aren't resolving right...... This issue is only popping up in the
`StreamStreamJoinIntegrationTest` so maybe it's something with the test set-up?
But I'm kinda at a loss about how to get it to stop complaining ☹️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]