lct45 commented on a change in pull request #10813:
URL: https://github.com/apache/kafka/pull/10813#discussion_r661795305



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java
##########
@@ -27,70 +32,83 @@
 public class WrappingNullableUtils {
 
     @SuppressWarnings("unchecked")
-    private static <T> Deserializer<T> prepareDeserializer(final 
Deserializer<T> specificDeserializer, final Deserializer<?> 
contextKeyDeserializer, final Deserializer<?> contextValueDeserializer, final 
boolean isKey) {
-        Deserializer<T> deserializerToUse = specificDeserializer;
-        if (deserializerToUse == null) {
+    private static <T> Deserializer<T> prepareDeserializer(final 
Deserializer<T> specificDeserializer, final ProcessorContext context, final 
boolean isKey, final String name) {
+        final Deserializer<T> deserializerToUse;
+
+        if (specificDeserializer == null) {
+            final Deserializer<?> contextKeyDeserializer = 
context.keySerde().deserializer();
+            final Deserializer<?> contextValueDeserializer = 
context.valueSerde().deserializer();
             deserializerToUse = (Deserializer<T>) (isKey ? 
contextKeyDeserializer : contextValueDeserializer);
         } else {
-            initNullableDeserializer(deserializerToUse, 
contextKeyDeserializer, contextValueDeserializer);
+            deserializerToUse = specificDeserializer;
         }
+        initNullableDeserializer(deserializerToUse, context);
         return deserializerToUse;
     }
     @SuppressWarnings("unchecked")
-    private static <T> Serializer<T> prepareSerializer(final Serializer<T> 
specificSerializer, final Serializer<?> contextKeySerializer, final 
Serializer<?> contextValueSerializer, final boolean isKey) {
-        Serializer<T> serializerToUse = specificSerializer;
-        if (serializerToUse == null) {
+    private static <T> Serializer<T> prepareSerializer(final Serializer<T> 
specificSerializer, final ProcessorContext context, final boolean isKey, final 
String name) {
+        final Serializer<T> serializerToUse;
+        if (specificSerializer == null) {
+            final Serializer<?> contextKeySerializer = 
context.keySerde().serializer();
+            final Serializer<?> contextValueSerializer = 
context.valueSerde().serializer();
             serializerToUse = (Serializer<T>) (isKey ? contextKeySerializer : 
contextValueSerializer);
         } else {
-            initNullableSerializer(serializerToUse, contextKeySerializer, 
contextValueSerializer);
+            serializerToUse = specificSerializer;
         }
+        initNullableSerializer(serializerToUse, context);
         return serializerToUse;
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, 
final Serde<?> contextKeySerde, final Serde<?> contextValueSerde, final boolean 
isKey) {
-        Serde<T> serdeToUse = specificSerde;
+    private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, 
final SerdeGetter getter, final boolean isKey) {
+        final Serde<T> serdeToUse;
+        if (specificSerde == null) {
+            serdeToUse = (Serde<T>) (isKey ?  getter.keySerde() : 
getter.valueSerde());
+        } else {
+            serdeToUse = specificSerde;
+        }
         if (serdeToUse == null) {
-            serdeToUse = (Serde<T>) (isKey ?  contextKeySerde : 
contextValueSerde);
+            final String serde = isKey ? "key" : "value";
+            throw new ConfigException("Please specify a " + serde + " serde or 
set one through StreamsConfig#DEFAULT_" + serde.toUpperCase(Locale.ROOT) + 
"_SERDE_CLASS_CONFIG");
         } else if (serdeToUse instanceof WrappingNullableSerde) {
-            ((WrappingNullableSerde) serdeToUse).setIfUnset(contextKeySerde, 
contextValueSerde);
+            ((WrappingNullableSerde) serdeToUse).setIfUnset(getter);

Review comment:
       This is causing issues - even though `contextKeySerde` was `Serde<?>`, 
some of the underlying `setIfUnset` methods are complaining when I access the 
serde straight from the `getter`.
   
   In `StreamStreamJoinIntegrationTest#testOuterRepartitioned` I get a cast 
exception:
   `class org.apache.kafka.common.serialization.LongDeserializer cannot be cast 
to class org.apache.kafka.common.serialization.Serializer 
(org.apache.kafka.common.serialization.LongDeserializer and 
org.apache.kafka.common.serialization.Serializer are in unnamed module of 
loader 'app')
   `
   
   It seems like because we aren't passing in the serde directly the generics 
aren't resolving right...... This issue is only popping up in the 
`StreamStreamJoinIntegrationTest` so maybe it's something with the test set-up? 
But I'm kinda at a loss about how to get it to stop complaining ☹️




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to