mjsax commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661896406
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,70 +32,83 @@
public class WrappingNullableUtils {
@SuppressWarnings("unchecked")
- private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final Deserializer<?>
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final
boolean isKey) {
- Deserializer<T> deserializerToUse = specificDeserializer;
- if (deserializerToUse == null) {
+ private static <T> Deserializer<T> prepareDeserializer(final
Deserializer<T> specificDeserializer, final ProcessorContext context, final
boolean isKey, final String name) {
+ final Deserializer<T> deserializerToUse;
+
+ if (specificDeserializer == null) {
+ final Deserializer<?> contextKeyDeserializer =
context.keySerde().deserializer();
+ final Deserializer<?> contextValueDeserializer =
context.valueSerde().deserializer();
deserializerToUse = (Deserializer<T>) (isKey ?
contextKeyDeserializer : contextValueDeserializer);
} else {
- initNullableDeserializer(deserializerToUse,
contextKeyDeserializer, contextValueDeserializer);
+ deserializerToUse = specificDeserializer;
+ initNullableDeserializer(deserializerToUse, new
SerdeGetter(context));
}
return deserializerToUse;
}
@SuppressWarnings("unchecked")
- private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final Serializer<?> contextKeySerializer, final
Serializer<?> contextValueSerializer, final boolean isKey) {
- Serializer<T> serializerToUse = specificSerializer;
- if (serializerToUse == null) {
+ private static <T> Serializer<T> prepareSerializer(final Serializer<T>
specificSerializer, final ProcessorContext context, final boolean isKey, final
String name) {
+ final Serializer<T> serializerToUse;
+ if (specificSerializer == null) {
+ final Serializer<?> contextKeySerializer =
context.keySerde().serializer();
+ final Serializer<?> contextValueSerializer =
context.valueSerde().serializer();
serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer :
contextValueSerializer);
} else {
- initNullableSerializer(serializerToUse, contextKeySerializer,
contextValueSerializer);
+ serializerToUse = specificSerializer;
+ initNullableSerializer(serializerToUse, new SerdeGetter(context));
}
return serializerToUse;
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean
isKey) {
- Serde<T> serdeToUse = specificSerde;
+ private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde,
final SerdeGetter getter, final boolean isKey) {
+ final Serde<T> serdeToUse;
+ if (specificSerde == null) {
+ serdeToUse = (Serde<T>) (isKey ? getter.keySerde() :
getter.valueSerde());
+ } else {
+ serdeToUse = specificSerde;
+ }
if (serdeToUse == null) {
Review comment:
I think this condition cannot be true any longer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]